diff --git a/cmd/config.go b/cmd/config.go index 62c99005a..d38291d57 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -230,8 +230,13 @@ type SyslogConfig struct { // ServiceDomain contains the service and domain name the gRPC or bdns provider // will use to construct a SRV DNS query to lookup backends. type ServiceDomain struct { + // Service is the service name to be used for SRV lookups. For example: if + // record is 'foo.service.consul', then the Service is 'foo'. Service string `validate:"required"` - Domain string `validate:"required"` + + // Domain is the domain name to be used for SRV lookups. For example: if the + // record is 'foo.service.consul', then the Domain is 'service.consul'. + Domain string `validate:"required"` } // GRPCClientConfig contains the information necessary to setup a gRPC client diff --git a/redis/lookup.go b/redis/lookup.go new file mode 100644 index 000000000..7becdb1b2 --- /dev/null +++ b/redis/lookup.go @@ -0,0 +1,216 @@ +package redis + +import ( + "context" + "errors" + "fmt" + "net" + "strings" + "time" + + "github.com/letsencrypt/boulder/cmd" + blog "github.com/letsencrypt/boulder/log" + + "github.com/redis/go-redis/v9" +) + +var ErrNoShardsResolved = errors.New("0 shards were resolved") + +// Lookup wraps a Redis ring client by reference and keeps the Redis ring shards +// up to date via periodic SRV lookups. +type Lookup struct { + // srvLookups is a list of SRV records to be looked up. + srvLookups []cmd.ServiceDomain + + // updateFrequency is the frequency of periodic SRV lookups. Defaults to 30 + // seconds. + updateFrequency time.Duration + + // updateTimeout is the timeout for each SRV lookup. Defaults to 90% of the + // update frequency. + updateTimeout time.Duration + + // dnsAuthority is the single : of the DNS + // server to be used for SRV lookups. If the address contains a hostname it + // will be resolved via the system DNS. If the port is left unspecified it + // will default to '53'. If this field is left unspecified the system DNS + // will be used for resolution. + dnsAuthority string + + resolver *net.Resolver + ring *redis.Ring + logger blog.Logger +} + +// NewLookup constructs and returns a new Lookup instance. An initial SRV lookup +// is performed to populate the Redis ring shards. If this lookup fails or +// otherwise results in an empty set of resolved shards, an error is returned. +func NewLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger) (*Lookup, error) { + updateFrequency := frequency + if updateFrequency <= 0 { + // Set default frequency. + updateFrequency = 30 * time.Second + } + // Set default timeout to 90% of the update frequency. + updateTimeout := updateFrequency - updateFrequency/10 + + var lookup *Lookup + if dnsAuthority == "" { + // Use the system DNS resolver. + lookup = &Lookup{ + srvLookups: srvLookups, + ring: ring, + logger: logger, + updateFrequency: updateFrequency, + updateTimeout: updateTimeout, + resolver: net.DefaultResolver, + dnsAuthority: dnsAuthority, + } + } else { + // Setup a custom DNS resolver. + lookup = &Lookup{ + srvLookups: srvLookups, + ring: ring, + logger: logger, + updateFrequency: updateFrequency, + updateTimeout: updateTimeout, + dnsAuthority: dnsAuthority, + } + + host, port, err := net.SplitHostPort(dnsAuthority) + if err != nil { + // Assume only hostname or IPv4 address was specified. + host = dnsAuthority + port = "53" + } + lookup.dnsAuthority = net.JoinHostPort(host, port) + lookup.resolver = &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + // The custom resolver closes over the lookup.dnsAuthority field + // so it can be swapped out in testing. + return net.Dial(network, lookup.dnsAuthority) + }, + } + } + + ctx, cancel := context.WithTimeout(context.Background(), updateTimeout) + defer cancel() + tempErr, nonTempErr := lookup.updateNow(ctx) + + if tempErr != nil { + // Log and discard temporary errors, as they're likely to be transient + // (e.g. network connectivity issues). + logger.Warningf("resolving ring shards: %s", tempErr) + } + if nonTempErr != nil && errors.Is(nonTempErr, ErrNoShardsResolved) { + // Non-temporary errors are always logged inside of updateNow(), so we + // only need return the error here if it's ErrNoShardsResolved. + return nil, nonTempErr + } + + return lookup, nil +} + +// updateNow resolves and updates the Redis ring shards accordingly. If all +// lookups fail or otherwise result in an empty set of resolved shards, the +// Redis ring is left unmodified and any errors are returned. If at least one +// lookup succeeds, the Redis ring is updated, and all errors are discarded. +// Non-temporary DNS errors are always logged as they occur, as they're likely +// to be indicative of a misconfiguration. +func (look *Lookup) updateNow(ctx context.Context) (tempError, nonTempError error) { + var tempErrs []error + handleDNSError := func(err error, srv cmd.ServiceDomain) { + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) && (dnsErr.IsTimeout || dnsErr.IsTemporary) { + tempErrs = append(tempErrs, err) + return + } + // Log non-temporary DNS errors as they occur, as they're likely to be + // indicative of misconfiguration. + look.logger.Errf("resolving service _%s._tcp.%s: %s", srv.Service, srv.Domain, err) + } + + nextAddrs := make(map[string]string) + for _, srv := range look.srvLookups { + _, targets, err := look.resolver.LookupSRV(ctx, srv.Service, "tcp", srv.Domain) + if err != nil { + handleDNSError(err, srv) + // Skip to the next SRV lookup. + continue + } + if len(targets) <= 0 { + tempErrs = append(tempErrs, fmt.Errorf("0 targets resolved for service \"_%s._tcp.%s\"", srv.Service, srv.Domain)) + // Skip to the next SRV lookup. + continue + } + + for _, target := range targets { + host := strings.TrimRight(target.Target, ".") + if look.dnsAuthority != "" { + // Lookup A/AAAA records for the SRV target using the custom DNS + // authority. + hostAddrs, err := look.resolver.LookupHost(ctx, host) + if err != nil { + handleDNSError(err, srv) + // Skip to the next A/AAAA lookup. + continue + } + if len(hostAddrs) <= 0 { + tempErrs = append(tempErrs, fmt.Errorf("0 addrs resolved for target %q of service \"_%s._tcp.%s\"", host, srv.Service, srv.Domain)) + // Skip to the next A/AAAA lookup. + continue + } + // Use the first resolved IP address. + host = hostAddrs[0] + } + addr := fmt.Sprintf("%s:%d", host, target.Port) + nextAddrs[addr] = addr + } + } + + // Only return errors if we failed to resolve any shards. + if len(nextAddrs) <= 0 { + return errors.Join(tempErrs...), ErrNoShardsResolved + } + + // Some shards were resolved, update the Redis ring and discard all errors. + look.ring.SetAddrs(nextAddrs) + return nil, nil +} + +// Start starts a goroutine that keeps the Redis ring shards up to date via +// periodic SRV lookups. The goroutine will exit when the provided context is +// cancelled. +func (look *Lookup) Start(ctx context.Context) { + go func() { + ticker := time.NewTicker(look.updateFrequency) + defer ticker.Stop() + for { + // Check for context cancellation before we do any work. + if ctx.Err() != nil { + return + } + + timeoutCtx, cancel := context.WithTimeout(ctx, look.updateTimeout) + tempErrs, nonTempErrs := look.updateNow(timeoutCtx) + cancel() + if tempErrs != nil { + look.logger.Warningf("resolving ring shards, temporary errors: %s", tempErrs) + continue + } + if nonTempErrs != nil { + look.logger.Errf("resolving ring shards, non-temporary errors: %s", nonTempErrs) + continue + } + + select { + case <-ticker.C: + continue + + case <-ctx.Done(): + return + } + } + }() +} diff --git a/redis/lookup_test.go b/redis/lookup_test.go new file mode 100644 index 000000000..fd923d46c --- /dev/null +++ b/redis/lookup_test.go @@ -0,0 +1,248 @@ +package redis + +import ( + "context" + "testing" + "time" + + "github.com/letsencrypt/boulder/cmd" + blog "github.com/letsencrypt/boulder/log" + "github.com/letsencrypt/boulder/metrics" + "github.com/letsencrypt/boulder/test" + + "github.com/redis/go-redis/v9" +) + +func newTestRedisRing() *redis.Ring { + CACertFile := "../test/redis-tls/minica.pem" + CertFile := "../test/redis-tls/boulder/cert.pem" + KeyFile := "../test/redis-tls/boulder/key.pem" + tlsConfig := cmd.TLSConfig{ + CACertFile: CACertFile, + CertFile: CertFile, + KeyFile: KeyFile, + } + tlsConfig2, err := tlsConfig.Load(metrics.NoopRegisterer) + if err != nil { + panic(err) + } + + client := redis.NewRing(&redis.RingOptions{ + Username: "unittest-rw", + Password: "824968fa490f4ecec1e52d5e34916bdb60d45f8d", + TLSConfig: tlsConfig2, + }) + return client +} + +func TestNewLookup(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + _, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") +} + +func TestStart(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + lookup, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") + + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lookup.Start(testCtx) +} + +func TestNewLookupWithOneFailingSRV(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + _, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "doesnotexist", + Domain: "service.consuls", + }, + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") +} + +func TestNewLookupWithAllFailingSRV(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + _, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "doesnotexist", + Domain: "service.consuls", + }, + { + Service: "doesnotexist2", + Domain: "service.consuls", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertError(t, err, "Expected NewLookup construction to fail") +} + +func TestUpdateNowWithAllFailingSRV(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + lookup, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") + + lookup.srvLookups = []cmd.ServiceDomain{ + { + Service: "doesnotexist1", + Domain: "service.consul", + }, + { + Service: "doesnotexist2", + Domain: "service.consul", + }, + } + + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempErr, nonTempErr := lookup.updateNow(testCtx) + test.AssertNotError(t, tempErr, "Expected no temporary errors") + test.AssertError(t, nonTempErr, "Expected non-temporary errors to have occurred") +} + +func TestUpdateNowWithAllFailingSRVs(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + lookup, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") + + // Replace the dnsAuthority with a non-existent DNS server, this will cause + // a timeout error, which is technically a temporary error, but will + // eventually result in a non-temporary error when no shards are resolved. + lookup.dnsAuthority = "consuls.services.consuls:53" + + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + tempErr, nonTempErr := lookup.updateNow(testCtx) + test.AssertError(t, tempErr, "Expected temporary errors") + test.AssertError(t, nonTempErr, "Expected a non-temporary error") + test.AssertErrorIs(t, nonTempErr, ErrNoShardsResolved) +} + +func TestUpdateNowWithOneFailingSRV(t *testing.T) { + t.Parallel() + + logger := blog.NewMock() + ring := newTestRedisRing() + + lookup, err := NewLookup([]cmd.ServiceDomain{ + { + Service: "doesnotexist", + Domain: "service.consuls", + }, + { + Service: "redisratelimits", + Domain: "service.consul", + }, + }, + "consul.service.consul", + 250*time.Millisecond, + ring, + logger, + ) + test.AssertNotError(t, err, "Expected NewLookup construction to succeed") + + // The Consul service entry for 'redisratelimits' is configured to return + // two SRV targets. We should only have two shards in the ring. + test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring") + + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Ensure we can reach both shards using the PING command. + err = ring.ForEachShard(testCtx, func(ctx context.Context, shard *redis.Client) error { + return shard.Ping(ctx).Err() + }) + test.AssertNotError(t, err, "Expected PING to succeed for both shards") + + // Drop both Shards from the ring. + ring.SetAddrs(map[string]string{}) + test.Assert(t, ring.Len() == 0, "Expected 0 shards in the ring") + + // Force a lookup to occur. + tempErr, nonTempErr := lookup.updateNow(testCtx) + test.AssertNotError(t, tempErr, "Expected no temporary errors") + test.AssertNotError(t, nonTempErr, "Expected no non-temporary errors") + + // The ring should now have two shards again. + test.Assert(t, ring.Len() == 2, "Expected 2 shards in the ring") +} diff --git a/test/consul/config.hcl b/test/consul/config.hcl index dab281b19..f1dbb0f66 100644 --- a/test/consul/config.hcl +++ b/test/consul/config.hcl @@ -349,6 +349,22 @@ services { tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution. } +services { + id = "bredis3" + name = "redisratelimits" + address = "10.33.33.4" + port = 4218 + tags = ["tcp"] // Required for SRV RR support in DNS resolution. +} + +services { + id = "bredis4" + name = "redisratelimits" + address = "10.33.33.5" + port = 4218 + tags = ["tcp"] // Required for SRV RR support in DNS resolution. +} + // // The following services are used for testing the gRPC DNS resolver. //