grpc/sa: Implement deep health checks (#6928)
Add the necessary scaffolding for deep health checking of our various gRPC components. Each component implementation that also implements the grpc.checker interface will be checked periodically, and the health status of the component will be updated accordingly. Add the necessary methods to SA to implement the grpc.checker interface and register these new health checks with Consul. Additionally: - Update entry point script to check for ProxySQL readiness. - Increase the poll rate for gRPC Consul checks from 5s to 2s to help with DNS failures, due to check failures, on startup. - Change log level for Consul from INFO to ERROR to deal with noisy logs full of transport failures due to Consul gRPC checks firing before the SAs are up. Fixes #6878 Part of #6795
This commit is contained in:
parent
e18507a43c
commit
124c4cc6f5
|
@ -417,7 +417,7 @@ func daemon(c Config, ap *akamaiPurger, logger blog.Logger, scope prometheus.Reg
|
|||
}
|
||||
}()
|
||||
|
||||
start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.AkamaiPurger.GRPC, logger).Add(
|
||||
&akamaipb.AkamaiPurger_ServiceDesc, ap).Build(tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to setup Akamai purger gRPC server")
|
||||
|
||||
|
|
|
@ -250,7 +250,7 @@ func main() {
|
|||
logger.Infof("Created a reloadable allow list, it was initialized with %d entries", entries)
|
||||
}
|
||||
|
||||
srv := bgrpc.NewServer(c.CA.GRPCCA)
|
||||
srv := bgrpc.NewServer(c.CA.GRPCCA, logger)
|
||||
|
||||
if !c.CA.DisableOCSPService {
|
||||
ocspi, err := ca.NewOCSPImpl(
|
||||
|
|
|
@ -94,7 +94,7 @@ func main() {
|
|||
|
||||
pubi := publisher.New(bundles, c.Publisher.UserAgent, logger, scope)
|
||||
|
||||
start, err := bgrpc.NewServer(c.Publisher.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.Publisher.GRPC, logger).Add(
|
||||
&pubpb.Publisher_ServiceDesc, pubi).Build(tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to setup Publisher gRPC server")
|
||||
|
||||
|
|
|
@ -257,7 +257,7 @@ func main() {
|
|||
rai.OCSP = ocspc
|
||||
rai.SA = sac
|
||||
|
||||
start, err := bgrpc.NewServer(c.RA.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.RA.GRPC, logger).Add(
|
||||
&rapb.RegistrationAuthority_ServiceDesc, rai).Build(tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to setup RA gRPC server")
|
||||
|
||||
|
|
|
@ -94,7 +94,7 @@ func main() {
|
|||
sai, err := sa.NewSQLStorageAuthorityWrapping(saroi, dbMap, scope)
|
||||
cmd.FailOnError(err, "Failed to create SA impl")
|
||||
|
||||
start, err := bgrpc.NewServer(c.SA.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.SA.GRPC, logger).WithCheckInterval(c.SA.HealthCheckInterval.Duration).Add(
|
||||
&sapb.StorageAuthorityReadOnly_ServiceDesc, saroi).Add(
|
||||
&sapb.StorageAuthority_ServiceDesc, sai).Build(
|
||||
tls, scope, clk)
|
||||
|
|
|
@ -155,7 +155,7 @@ func main() {
|
|||
c.VA.AccountURIPrefixes)
|
||||
cmd.FailOnError(err, "Unable to create VA server")
|
||||
|
||||
start, err := bgrpc.NewServer(c.VA.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.VA.GRPC, logger).Add(
|
||||
&vapb.VA_ServiceDesc, vai).Add(
|
||||
&vapb.CAA_ServiceDesc, vai).Build(tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to setup VA gRPC server")
|
||||
|
|
|
@ -43,6 +43,10 @@ type ServiceConfig struct {
|
|||
DebugAddr string `validate:"hostname_port"`
|
||||
GRPC *GRPCServerConfig
|
||||
TLS TLSConfig
|
||||
|
||||
// HealthCheckInterval is the duration between deep health checks of the
|
||||
// service. Defaults to 5 seconds.
|
||||
HealthCheckInterval config.Duration `validate:"-"`
|
||||
}
|
||||
|
||||
// DBConfig defines how to connect to a database. The connect string is
|
||||
|
|
|
@ -125,7 +125,7 @@ func main() {
|
|||
csi, err := storer.New(issuers, s3client, c.CRLStorer.S3Bucket, scope, logger, clk)
|
||||
cmd.FailOnError(err, "Failed to create CRLStorer impl")
|
||||
|
||||
start, err := bgrpc.NewServer(c.CRLStorer.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.CRLStorer.GRPC, logger).Add(
|
||||
&cspb.CRLStorer_ServiceDesc, csi).Build(tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to setup CRLStorer gRPC server")
|
||||
|
||||
|
|
|
@ -112,7 +112,7 @@ func main() {
|
|||
cmd.FailOnError(err, "tlsConfig config")
|
||||
|
||||
nonceServer := nonce.NewServer(ns)
|
||||
start, err := bgrpc.NewServer(c.NonceService.GRPC).Add(
|
||||
start, err := bgrpc.NewServer(c.NonceService.GRPC, logger).Add(
|
||||
&noncepb.NonceService_ServiceDesc, nonceServer).Build(tlsConfig, scope, cmd.Clock())
|
||||
cmd.FailOnError(err, "Unable to setup nonce service gRPC server")
|
||||
|
||||
|
|
120
grpc/server.go
120
grpc/server.go
|
@ -1,15 +1,18 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/jmhodges/clock"
|
||||
bcreds "github.com/letsencrypt/boulder/grpc/creds"
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
|
||||
|
@ -27,6 +30,15 @@ var CodedError = status.Errorf
|
|||
|
||||
var errNilTLS = errors.New("boulder/grpc: received nil tls.Config")
|
||||
|
||||
// checker is an interface for checking the health of a grpc service
|
||||
// implementation.
|
||||
type checker interface {
|
||||
// Health returns nil if the service is healthy, or an error if it is not.
|
||||
// If the passed context is canceled, it should return immediately with an
|
||||
// error.
|
||||
Health(context.Context) error
|
||||
}
|
||||
|
||||
// service represents a single gRPC service that can be registered with a gRPC
|
||||
// server.
|
||||
type service struct {
|
||||
|
@ -37,15 +49,27 @@ type service struct {
|
|||
// serverBuilder implements a builder pattern for constructing new gRPC servers
|
||||
// and registering gRPC services on those servers.
|
||||
type serverBuilder struct {
|
||||
cfg *cmd.GRPCServerConfig
|
||||
services map[string]service
|
||||
err error
|
||||
cfg *cmd.GRPCServerConfig
|
||||
services map[string]service
|
||||
healthSrv *health.Server
|
||||
checkInterval time.Duration
|
||||
logger blog.Logger
|
||||
err error
|
||||
}
|
||||
|
||||
// NewServer returns an object which can be used to build gRPC servers. It
|
||||
// takes the server's configuration to perform initialization.
|
||||
func NewServer(c *cmd.GRPCServerConfig) *serverBuilder {
|
||||
return &serverBuilder{cfg: c, services: make(map[string]service)}
|
||||
// NewServer returns an object which can be used to build gRPC servers. It takes
|
||||
// the server's configuration to perform initialization and a logger for deep
|
||||
// health checks.
|
||||
func NewServer(c *cmd.GRPCServerConfig, logger blog.Logger) *serverBuilder {
|
||||
return &serverBuilder{cfg: c, services: make(map[string]service), logger: logger}
|
||||
}
|
||||
|
||||
// WithCheckInterval sets the interval at which the server will check the health
|
||||
// of its registered services. If this is not called, a default interval of 5
|
||||
// seconds will be used.
|
||||
func (sb *serverBuilder) WithCheckInterval(i time.Duration) *serverBuilder {
|
||||
sb.checkInterval = i
|
||||
return sb
|
||||
}
|
||||
|
||||
// Add registers a new service (consisting of its description and its
|
||||
|
@ -59,8 +83,7 @@ func (sb *serverBuilder) Add(desc *grpc.ServiceDesc, impl any) *serverBuilder {
|
|||
sb.err = fmt.Errorf("attempted double-registration of gRPC service %q", desc.ServiceName)
|
||||
return sb
|
||||
}
|
||||
|
||||
sb.services[desc.ServiceName] = service{desc, impl}
|
||||
sb.services[desc.ServiceName] = service{desc: desc, impl: impl}
|
||||
return sb
|
||||
}
|
||||
|
||||
|
@ -71,9 +94,9 @@ func (sb *serverBuilder) Add(desc *grpc.ServiceDesc, impl any) *serverBuilder {
|
|||
// gracefully stop the server if one is caught, causing the start() function to
|
||||
// exit.
|
||||
func (sb *serverBuilder) Build(tlsConfig *tls.Config, statsRegistry prometheus.Registerer, clk clock.Clock) (func() error, error) {
|
||||
// Add the health service to all servers.
|
||||
healthSrv := health.NewServer()
|
||||
sb = sb.Add(&healthpb.Health_ServiceDesc, healthSrv)
|
||||
// Register the health service with the server.
|
||||
sb.healthSrv = health.NewServer()
|
||||
sb.Add(&healthpb.Health_ServiceDesc, sb.healthSrv)
|
||||
|
||||
// Check to see if any of the calls to .Add() resulted in an error.
|
||||
if sb.err != nil {
|
||||
|
@ -169,17 +192,88 @@ func (sb *serverBuilder) Build(tlsConfig *tls.Config, statsRegistry prometheus.R
|
|||
return server.Serve(listener)
|
||||
}
|
||||
|
||||
// Initialize long-running health checks of all services which implement the
|
||||
// checker interface.
|
||||
if sb.checkInterval <= 0 {
|
||||
sb.checkInterval = 5 * time.Second
|
||||
}
|
||||
healthCtx, stopHealthChecks := context.WithCancel(context.Background())
|
||||
for _, s := range sb.services {
|
||||
check, ok := s.impl.(checker)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
sb.initLongRunningCheck(healthCtx, s.desc.ServiceName, check.Health)
|
||||
}
|
||||
|
||||
// Start a goroutine which listens for a termination signal, and then
|
||||
// gracefully stops the gRPC server. This in turn causes the start() function
|
||||
// to exit, allowing its caller (generally a main() function) to exit.
|
||||
go cmd.CatchSignals(func() {
|
||||
healthSrv.Shutdown()
|
||||
stopHealthChecks()
|
||||
sb.healthSrv.Shutdown()
|
||||
server.GracefulStop()
|
||||
})
|
||||
|
||||
return start, nil
|
||||
}
|
||||
|
||||
// initLongRunningCheck initializes a goroutine which will periodically check
|
||||
// the health of the provided service and update the health server accordingly.
|
||||
func (sb *serverBuilder) initLongRunningCheck(shutdownCtx context.Context, service string, checkImpl func(context.Context) error) {
|
||||
// Set the initial health status for the service.
|
||||
sb.healthSrv.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
|
||||
|
||||
// check is a helper function that checks the health of the service and, if
|
||||
// necessary, updates its status in the health server.
|
||||
checkAndMaybeUpdate := func(checkCtx context.Context, last healthpb.HealthCheckResponse_ServingStatus) healthpb.HealthCheckResponse_ServingStatus {
|
||||
// Make a context with a timeout at 90% of the interval.
|
||||
checkImplCtx, cancel := context.WithTimeout(checkCtx, sb.checkInterval*9/10)
|
||||
defer cancel()
|
||||
|
||||
var next healthpb.HealthCheckResponse_ServingStatus
|
||||
err := checkImpl(checkImplCtx)
|
||||
if err != nil {
|
||||
next = healthpb.HealthCheckResponse_NOT_SERVING
|
||||
} else {
|
||||
next = healthpb.HealthCheckResponse_SERVING
|
||||
}
|
||||
|
||||
if last == next {
|
||||
// No change in health status.
|
||||
return next
|
||||
}
|
||||
|
||||
if next != healthpb.HealthCheckResponse_SERVING {
|
||||
sb.logger.Errf("transitioning health of %q from %q to %q, due to: %s", service, last, next, err)
|
||||
} else {
|
||||
sb.logger.Infof("transitioning health of %q from %q to %q", service, last, next)
|
||||
}
|
||||
sb.healthSrv.SetServingStatus(service, next)
|
||||
return next
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(sb.checkInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Assume the service is not healthy to start.
|
||||
last := healthpb.HealthCheckResponse_NOT_SERVING
|
||||
|
||||
// Check immediately, and then at the specified interval.
|
||||
last = checkAndMaybeUpdate(shutdownCtx, last)
|
||||
for {
|
||||
select {
|
||||
case <-shutdownCtx.Done():
|
||||
// The server is shutting down.
|
||||
return
|
||||
case <-ticker.C:
|
||||
last = checkAndMaybeUpdate(shutdownCtx, last)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// serverMetrics is a struct type used to return a few registered metrics from
|
||||
// `newServerMetrics`
|
||||
type serverMetrics struct {
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
blog "github.com/letsencrypt/boulder/log"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"google.golang.org/grpc/health"
|
||||
)
|
||||
|
||||
func Test_serverBuilder_initLongRunningCheck(t *testing.T) {
|
||||
t.Parallel()
|
||||
hs := health.NewServer()
|
||||
mockLogger := blog.NewMock()
|
||||
sb := &serverBuilder{
|
||||
healthSrv: hs,
|
||||
logger: mockLogger,
|
||||
checkInterval: time.Millisecond * 50,
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
count := 0
|
||||
failEveryThirdCheck := func(context.Context) error {
|
||||
count++
|
||||
if count%3 == 0 {
|
||||
return errors.New("oops")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
sb.initLongRunningCheck(ctx, "test", failEveryThirdCheck)
|
||||
time.Sleep(time.Millisecond * 110)
|
||||
cancel()
|
||||
|
||||
// We expect the following transition timeline:
|
||||
// - ~0ms 1st check passed, NOT_SERVING to SERVING
|
||||
// - ~50ms 2nd check passed, [no transition]
|
||||
// - ~100ms 3rd check failed, SERVING to NOT_SERVING
|
||||
serving := mockLogger.GetAllMatching(".*\"NOT_SERVING\" to \"SERVING\"")
|
||||
notServing := mockLogger.GetAllMatching((".*\"SERVING\" to \"NOT_SERVING\""))
|
||||
test.Assert(t, len(serving) == 1, "expected one serving log line")
|
||||
test.Assert(t, len(notServing) == 1, "expected one not serving log line")
|
||||
|
||||
mockLogger.Clear()
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
count = 0
|
||||
failEveryOtherCheck := func(context.Context) error {
|
||||
count++
|
||||
if count%2 == 0 {
|
||||
return errors.New("oops")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
sb.initLongRunningCheck(ctx, "test", failEveryOtherCheck)
|
||||
time.Sleep(time.Millisecond * 110)
|
||||
cancel()
|
||||
|
||||
// We expect the following transition timeline:
|
||||
// - ~0ms 1st check passed, NOT_SERVING to SERVING
|
||||
// - ~50ms 2nd check failed, SERVING to NOT_SERVING
|
||||
// - ~100ms 3rd check passed, NOT_SERVING to SERVING
|
||||
serving = mockLogger.GetAllMatching(".*\"NOT_SERVING\" to \"SERVING\"")
|
||||
notServing = mockLogger.GetAllMatching((".*\"SERVING\" to \"NOT_SERVING\""))
|
||||
test.Assert(t, len(serving) == 2, "expected two serving log lines")
|
||||
test.Assert(t, len(notServing) == 1, "expected one not serving log line")
|
||||
}
|
14
sa/sa.go
14
sa/sa.go
|
@ -901,3 +901,17 @@ func (ssa *SQLStorageAuthority) AddBlockedKey(ctx context.Context, req *sapb.Add
|
|||
}
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
// Health implements the grpc.checker interface.
|
||||
func (ssa *SQLStorageAuthority) Health(ctx context.Context) error {
|
||||
err := ssa.dbMap.WithContext(ctx).SelectOne(new(int), "SELECT 1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ssa.SQLStorageAuthorityRO.Health(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1353,3 +1353,12 @@ func (ssa *SQLStorageAuthorityRO) GetMaxExpiration(ctx context.Context, req *emp
|
|||
func (ssa *SQLStorageAuthority) GetMaxExpiration(ctx context.Context, req *emptypb.Empty) (*timestamppb.Timestamp, error) {
|
||||
return ssa.SQLStorageAuthorityRO.GetMaxExpiration(ctx, req)
|
||||
}
|
||||
|
||||
// Health implements the grpc.checker interface.
|
||||
func (ssa *SQLStorageAuthorityRO) Health(ctx context.Context) error {
|
||||
err := ssa.dbReadOnlyMap.WithContext(ctx).SelectOne(new(int), "SELECT 1")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"healthCheckInterval": "4s",
|
||||
"features": {
|
||||
"StoreRevokerInfo": true
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
client_addr = "0.0.0.0"
|
||||
bind_addr = "10.55.55.10"
|
||||
log_level = "INFO"
|
||||
log_level = "ERROR"
|
||||
// When set, uses a subset of the agent's TLS configuration (key_file,
|
||||
// cert_file, ca_file, ca_path, and server_name) to set up the client for HTTP
|
||||
// or gRPC health checks. This allows services requiring 2-way TLS to be checked
|
||||
|
@ -233,15 +233,35 @@ services {
|
|||
address = "10.77.77.77"
|
||||
port = 9095
|
||||
tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution.
|
||||
check {
|
||||
id = "sa-a-grpc"
|
||||
name = "sa-a-grpc"
|
||||
grpc = "10.77.77.77:9095"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "5s"
|
||||
}
|
||||
checks = [
|
||||
{
|
||||
id = "sa-a-grpc"
|
||||
name = "sa-a-grpc"
|
||||
grpc = "10.77.77.77:9095"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
},
|
||||
{
|
||||
id = "sa-a-grpc-sa"
|
||||
name = "sa-a-grpc-sa"
|
||||
grpc = "10.77.77.77:9095/sa.StorageAuthority"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
},
|
||||
{
|
||||
id = "sa-a-grpc-saro"
|
||||
name = "sa-a-grpc-saro"
|
||||
grpc = "10.77.77.77:9095/sa.StorageAuthorityReadOnly"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
services {
|
||||
|
@ -250,15 +270,35 @@ services {
|
|||
address = "10.88.88.88"
|
||||
port = 9095
|
||||
tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution.
|
||||
check {
|
||||
id = "sa-b-grpc"
|
||||
name = "sa-b-grpc"
|
||||
grpc = "10.88.88.88:9095"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "5s"
|
||||
}
|
||||
checks = [
|
||||
{
|
||||
id = "sa-b-grpc"
|
||||
name = "sa-b-grpc"
|
||||
grpc = "10.88.88.88:9095"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
},
|
||||
{
|
||||
id = "sa-b-grpc-sa"
|
||||
name = "sa-b-grpc-sa"
|
||||
grpc = "10.88.88.88:9095/sa.StorageAuthority"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
},
|
||||
{
|
||||
id = "sa-b-grpc-saro"
|
||||
name = "sa-b-grpc-saro"
|
||||
grpc = "10.88.88.88:9095/sa.StorageAuthorityReadOnly"
|
||||
grpc_use_tls = true
|
||||
tls_server_name = "sa.boulder"
|
||||
tls_skip_verify = false
|
||||
interval = "2s"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
services {
|
||||
|
|
|
@ -13,6 +13,9 @@ service rsyslog start
|
|||
# make sure we can reach the mysqldb.
|
||||
./test/wait-for-it.sh boulder-mysql 3306
|
||||
|
||||
# make sure we can reach the proxysql.
|
||||
./test/wait-for-it.sh bproxysql 6032
|
||||
|
||||
# create the database
|
||||
MYSQL_CONTAINER=1 $DIR/create_db.sh
|
||||
|
||||
|
|
Loading…
Reference in New Issue