gRPC: publish RPC latency stat in server interceptor. (#3665)

We may see RPCs that are dispatched by a client but do not arrive at the server for some time afterwards. To have insight into potential request latency at this layer we want to publish the time delta between when a client sent an RPC and when the server received it.

This PR updates the gRPC client interceptor to add the current time to the gRPC request metadata context when it dispatches an RPC. The server side interceptor is updated to pull the client request time out of the gRPC request metadata. Using this timestamp it can calculate the latency and publish it as an observation on a Prometheus histogram.

Accomplishing the above required wiring a clock through to each of the client interceptors. This caused a small diff across each of the gRPC aware boulder commands.

A small unit test is included in this PR that checks that a latency stat is published to the histogram after an RPC to a test ChillerServer is made. It's difficult to do more in-depth testing because using fake clocks makes the latency 0 and using real clocks requires finding a way to queue/delay requests inside of the gRPC mechanisms not exposed to Boulder.

Updates https://github.com/letsencrypt/boulder/issues/3635 - Still TODO: Explicitly logging latency in the VA, tracking outstanding RPCs as a gauge.
This commit is contained in:
Daniel McCarney 2018-04-25 18:37:22 -04:00 committed by Jacob Hoffman-Andrews
parent 4dcbf5c883
commit aa810a3142
16 changed files with 232 additions and 74 deletions

View File

@ -65,8 +65,10 @@ func setupContext(c config) (core.RegistrationAuthority, blog.Logger, *gorp.DbMa
tlsConfig, err := c.Revoker.TLS.Load()
cmd.FailOnError(err, "TLS config")
clk := cmd.Clock()
clientMetrics := bgrpc.NewClientMetrics(metrics.NewNoopScope())
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tlsConfig, clientMetrics)
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
@ -75,7 +77,7 @@ func setupContext(c config) (core.RegistrationAuthority, blog.Logger, *gorp.DbMa
dbMap, err := sa.NewDbMap(dbURL, c.Revoker.DBConfig.MaxDBConns)
cmd.FailOnError(err, "Couldn't setup database connection")
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tlsConfig, clientMetrics)
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))

View File

@ -160,8 +160,10 @@ func main() {
tlsConfig, err := c.CA.TLS.Load()
cmd.FailOnError(err, "TLS config")
clk := cmd.Clock()
clientMetrics := bgrpc.NewClientMetrics(scope)
conn, err := bgrpc.ClientSetup(c.CA.SAService, tlsConfig, clientMetrics)
conn, err := bgrpc.ClientSetup(c.CA.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sa := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
@ -169,7 +171,7 @@ func main() {
c.CA,
sa,
pa,
cmd.Clock(),
clk,
scope,
issuers,
kp,
@ -177,8 +179,7 @@ func main() {
cmd.FailOnError(err, "Failed to create CA impl")
serverMetrics := bgrpc.NewServerMetrics(scope)
caSrv, caListener, err := bgrpc.NewServer(c.CA.GRPCCA, tlsConfig, serverMetrics)
caSrv, caListener, err := bgrpc.NewServer(c.CA.GRPCCA, tlsConfig, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup CA gRPC server")
caWrapper := bgrpc.NewCertificateAuthorityServer(cai)
caPB.RegisterCertificateAuthorityServer(caSrv, caWrapper)
@ -186,7 +187,7 @@ func main() {
cmd.FailOnError(cmd.FilterShutdownErrors(caSrv.Serve(caListener)), "CA gRPC service failed")
}()
ocspSrv, ocspListener, err := bgrpc.NewServer(c.CA.GRPCOCSPGenerator, tlsConfig, serverMetrics)
ocspSrv, ocspListener, err := bgrpc.NewServer(c.CA.GRPCOCSPGenerator, tlsConfig, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup CA gRPC server")
ocspWrapper := bgrpc.NewCertificateAuthorityServer(cai)
caPB.RegisterOCSPGeneratorServer(ocspSrv, ocspWrapper)

View File

@ -79,8 +79,10 @@ func main() {
tlsConfig, err := c.Publisher.TLS.Load()
cmd.FailOnError(err, "TLS config")
clk := cmd.Clock()
clientMetrics := bgrpc.NewClientMetrics(scope)
conn, err := bgrpc.ClientSetup(c.Publisher.SAService, tlsConfig, clientMetrics)
conn, err := bgrpc.ClientSetup(c.Publisher.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
@ -92,7 +94,7 @@ func main() {
sac)
serverMetrics := bgrpc.NewServerMetrics(scope)
grpcSrv, l, err := bgrpc.NewServer(c.Publisher.GRPC, tlsConfig, serverMetrics)
grpcSrv, l, err := bgrpc.NewServer(c.Publisher.GRPC, tlsConfig, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup Publisher gRPC server")
gw := bgrpc.NewPublisherServerWrapper(pubi)
pubPB.RegisterPublisherServer(grpcSrv, gw)

View File

@ -149,26 +149,28 @@ func main() {
tlsConfig, err := c.RA.TLS.Load()
cmd.FailOnError(err, "TLS config")
clk := cmd.Clock()
clientMetrics := bgrpc.NewClientMetrics(scope)
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tlsConfig, clientMetrics)
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Unable to create VA client")
vac := bgrpc.NewValidationAuthorityGRPCClient(vaConn)
caaClient := vaPB.NewCAAClient(vaConn)
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tlsConfig, clientMetrics)
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Unable to create CA client")
// Build a CA client that is only capable of issuing certificates, not
// signing OCSP. TODO(jsha): Once we've fully moved to gRPC, replace this
// with a plain caPB.NewCertificateAuthorityClient.
cac := bgrpc.NewCertificateAuthorityClient(caPB.NewCertificateAuthorityClient(caConn), nil)
raConn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, clientMetrics)
raConn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to Publisher")
pubc := bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(raConn))
var ctp *ctpolicy.CTPolicy
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, clientMetrics)
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to Publisher")
pubc = bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(conn))
@ -185,7 +187,7 @@ func main() {
ctp = ctpolicy.New(pubc, c.RA.CTLogGroups2, c.RA.InformationalCTLogs, logger, scope)
}
saConn, err := bgrpc.ClientSetup(c.RA.SAService, tlsConfig, clientMetrics)
saConn, err := bgrpc.ClientSetup(c.RA.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
@ -209,7 +211,7 @@ func main() {
}
rai := ra.NewRegistrationAuthorityImpl(
cmd.Clock(),
clk,
logger,
scope,
c.RA.MaxContactsPerRegistration,
@ -243,14 +245,14 @@ func main() {
raDNSTimeout,
c.RA.DNSResolvers,
scope,
cmd.Clock(),
clk,
dnsTries)
} else {
rai.DNSClient = bdns.NewTestDNSClientImpl(
raDNSTimeout,
c.RA.DNSResolvers,
scope,
cmd.Clock(),
clk,
dnsTries)
}
@ -259,7 +261,7 @@ func main() {
rai.SA = sac
serverMetrics := bgrpc.NewServerMetrics(scope)
grpcSrv, listener, err := bgrpc.NewServer(c.RA.GRPC, tlsConfig, serverMetrics)
grpcSrv, listener, err := bgrpc.NewServer(c.RA.GRPC, tlsConfig, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup RA gRPC server")
gw := bgrpc.NewRegistrationAuthorityServer(rai)
rapb.RegisterRegistrationAuthorityServer(grpcSrv, gw)

View File

@ -75,17 +75,19 @@ func main() {
}
go sa.ReportDbConnCount(dbMap, scope)
clk := cmd.Clock()
parallel := saConf.ParallelismPerRPC
if parallel < 1 {
parallel = 1
}
sai, err := sa.NewSQLStorageAuthority(dbMap, cmd.Clock(), logger, scope, parallel)
sai, err := sa.NewSQLStorageAuthority(dbMap, clk, logger, scope, parallel)
cmd.FailOnError(err, "Failed to create SA impl")
tls, err := c.SA.TLS.Load()
cmd.FailOnError(err, "TLS config")
serverMetrics := bgrpc.NewServerMetrics(scope)
grpcSrv, listener, err := bgrpc.NewServer(c.SA.GRPC, tls, serverMetrics)
grpcSrv, listener, err := bgrpc.NewServer(c.SA.GRPC, tls, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup SA gRPC server")
gw := bgrpc.NewStorageAuthorityServer(sai)
sapb.RegisterStorageAuthorityServer(grpcSrv, gw)

View File

@ -126,7 +126,7 @@ func main() {
var remotes []va.RemoteVA
if len(c.VA.RemoteVAs) > 0 {
for _, rva := range c.VA.RemoteVAs {
vaConn, err := bgrpc.ClientSetup(&rva, tlsConfig, clientMetrics)
vaConn, err := bgrpc.ClientSetup(&rva, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Unable to create remote VA client")
remotes = append(
remotes,
@ -151,7 +151,7 @@ func main() {
logger)
serverMetrics := bgrpc.NewServerMetrics(scope)
grpcSrv, l, err := bgrpc.NewServer(c.VA.GRPC, tlsConfig, serverMetrics)
grpcSrv, l, err := bgrpc.NewServer(c.VA.GRPC, tlsConfig, serverMetrics, clk)
cmd.FailOnError(err, "Unable to setup VA gRPC server")
err = bgrpc.RegisterValidationAuthorityGRPCServer(grpcSrv, vai)
cmd.FailOnError(err, "Unable to register VA gRPC server")

View File

@ -7,6 +7,7 @@ import (
"net/http"
"os"
"github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
"github.com/letsencrypt/boulder/features"
@ -60,16 +61,16 @@ type config struct {
}
}
func setupWFE(c config, logger blog.Logger, stats metrics.Scope) (core.RegistrationAuthority, core.StorageAuthority) {
func setupWFE(c config, logger blog.Logger, stats metrics.Scope, clk clock.Clock) (core.RegistrationAuthority, core.StorageAuthority) {
tlsConfig, err := c.WFE.TLS.Load()
cmd.FailOnError(err, "TLS config")
clientMetrics := bgrpc.NewClientMetrics(stats)
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, clientMetrics)
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, clientMetrics)
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
@ -95,11 +96,13 @@ func main() {
defer logger.AuditPanic()
logger.Info(cmd.VersionString())
clk := cmd.Clock()
kp, err := goodkey.NewKeyPolicy("") // don't load any weak keys
cmd.FailOnError(err, "Unable to create key policy")
wfe, err := wfe.NewWebFrontEndImpl(scope, cmd.Clock(), kp, logger)
wfe, err := wfe.NewWebFrontEndImpl(scope, clk, kp, logger)
cmd.FailOnError(err, "Unable to create WFE")
rac, sac := setupWFE(c, logger, scope)
rac, sac := setupWFE(c, logger, scope, clk)
wfe.RA = rac
wfe.SA = sac

View File

@ -11,6 +11,7 @@ import (
"net/http"
"os"
"github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
"github.com/letsencrypt/boulder/features"
@ -170,15 +171,15 @@ func loadCertificateChains(chainConfig map[string][]string) (map[string][]byte,
return results, nil
}
func setupWFE(c config, logger blog.Logger, stats metrics.Scope) (core.RegistrationAuthority, core.StorageAuthority) {
func setupWFE(c config, logger blog.Logger, stats metrics.Scope, clk clock.Clock) (core.RegistrationAuthority, core.StorageAuthority) {
tlsConfig, err := c.WFE.TLS.Load()
cmd.FailOnError(err, "TLS config")
clientMetrics := bgrpc.NewClientMetrics(stats)
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, clientMetrics)
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, clientMetrics)
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
@ -207,11 +208,13 @@ func main() {
defer logger.AuditPanic()
logger.Info(cmd.VersionString())
clk := cmd.Clock()
kp, err := goodkey.NewKeyPolicy("") // don't load any weak keys
cmd.FailOnError(err, "Unable to create key policy")
wfe, err := wfe2.NewWebFrontEndImpl(scope, cmd.Clock(), kp, certChains, logger)
wfe, err := wfe2.NewWebFrontEndImpl(scope, clk, kp, certChains, logger)
cmd.FailOnError(err, "Unable to create WFE")
rac, sac := setupWFE(c, logger, scope)
rac, sac := setupWFE(c, logger, scope, clk)
wfe.RA = rac
wfe.SA = sac

View File

@ -481,8 +481,10 @@ func main() {
tlsConfig, err := c.Mailer.TLS.Load()
cmd.FailOnError(err, "TLS config")
clk := cmd.Clock()
clientMetrics := bgrpc.NewClientMetrics(scope)
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tlsConfig, clientMetrics)
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tlsConfig, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
@ -557,7 +559,7 @@ func main() {
emailTemplate: tmpl,
nagTimes: nags,
limit: c.Mailer.CertLimit,
clk: cmd.Clock(),
clk: clk,
stats: initStats(scope),
}

View File

@ -715,7 +715,7 @@ type config struct {
}
}
func setupClients(c cmd.OCSPUpdaterConfig, stats metrics.Scope) (
func setupClients(c cmd.OCSPUpdaterConfig, stats metrics.Scope, clk clock.Clock) (
core.CertificateAuthority,
core.Publisher,
core.StorageAuthority,
@ -727,18 +727,18 @@ func setupClients(c cmd.OCSPUpdaterConfig, stats metrics.Scope) (
cmd.FailOnError(err, "TLS config")
}
clientMetrics := bgrpc.NewClientMetrics(stats)
caConn, err := bgrpc.ClientSetup(c.OCSPGeneratorService, tls, clientMetrics)
caConn, err := bgrpc.ClientSetup(c.OCSPGeneratorService, tls, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CA")
// Make a CA client that is only capable of signing OCSP.
// TODO(jsha): Once we've fully moved to gRPC, replace this
// with a plain caPB.NewOCSPGeneratorClient.
cac := bgrpc.NewCertificateAuthorityClient(nil, capb.NewOCSPGeneratorClient(caConn))
publisherConn, err := bgrpc.ClientSetup(c.Publisher, tls, clientMetrics)
publisherConn, err := bgrpc.ClientSetup(c.Publisher, tls, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create connection to service")
pubc := bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(publisherConn))
conn, err := bgrpc.ClientSetup(c.SAService, tls, clientMetrics)
conn, err := bgrpc.ClientSetup(c.SAService, tls, clientMetrics, clk)
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
@ -772,11 +772,12 @@ func main() {
cmd.FailOnError(err, "Could not connect to database")
go sa.ReportDbConnCount(dbMap, scope)
cac, pubc, sac := setupClients(conf, scope)
clk := cmd.Clock()
cac, pubc, sac := setupClients(conf, scope, clk)
updater, err := newUpdater(
scope,
cmd.Clock(),
clk,
dbMap,
cac,
pubc,

View File

@ -142,7 +142,7 @@ func setup(configFile string) (blog.Logger, core.StorageAuthority) {
cmd.FailOnError(err, "TLS config")
clientMetrics := bgrpc.NewClientMetrics(metrics.NewNoopScope())
conn, err := bgrpc.ClientSetup(conf.SAService, tlsConfig, clientMetrics)
conn, err := bgrpc.ClientSetup(conf.SAService, tlsConfig, clientMetrics, cmd.Clock())
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
@ -16,7 +17,7 @@ import (
// a client certificate and validates the the server certificate based
// on the provided *tls.Config.
// It dials the remote service and returns a grpc.ClientConn if successful.
func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, clientMetrics *grpc_prometheus.ClientMetrics) (*grpc.ClientConn, error) {
func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, clientMetrics *grpc_prometheus.ClientMetrics, clk clock.Clock) (*grpc.ClientConn, error) {
if len(c.ServerAddresses) == 0 {
return nil, fmt.Errorf("boulder/grpc: ServerAddresses is empty")
}
@ -24,7 +25,7 @@ func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, clientMetrics *grpc_p
return nil, errNilTLS
}
ci := clientInterceptor{c.Timeout.Duration, clientMetrics}
ci := clientInterceptor{c.Timeout.Duration, clientMetrics, clk}
creds := bcreds.NewClientCredentials(tls.RootCAs, tls.Certificates)
return grpc.Dial(
"", // Since our staticResolver provides addresses we don't need to pass an address here

View File

@ -10,8 +10,10 @@ import (
"google.golang.org/grpc"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jmhodges/clock"
berrors "github.com/letsencrypt/boulder/errors"
testproto "github.com/letsencrypt/boulder/grpc/test_proto"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/test"
)
@ -24,8 +26,9 @@ func (s *errorServer) Chill(_ context.Context, _ *testproto.Time) (*testproto.Ti
}
func TestErrorWrapping(t *testing.T) {
si := serverInterceptor{grpc_prometheus.NewServerMetrics()}
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics()}
serverMetrics := NewServerMetrics(metrics.NewNoopScope())
si := newServerInterceptor(serverMetrics, clock.NewFake())
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics(), clock.NewFake()}
srv := grpc.NewServer(grpc.UnaryInterceptor(si.intercept))
es := &errorServer{}
testproto.RegisterChillerServer(srv, es)

View File

@ -1,9 +1,11 @@
package grpc
import (
"strconv"
"time"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jmhodges/clock"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -13,21 +15,42 @@ import (
"github.com/letsencrypt/boulder/features"
)
const (
returnOverhead = 20 * time.Millisecond
meaningfulWorkOverhead = 100 * time.Millisecond
clientRequestTimeKey = "client-request-time"
serverLatencyKey = "server-latency"
)
// serverInterceptor is a gRPC interceptor that adds Prometheus
// metrics to requests handled by a gRPC server, and wraps Boulder-specific
// errors for transmission in a grpc/metadata trailer (see bcodes.go).
type serverInterceptor struct {
serverMetrics *grpc_prometheus.ServerMetrics
metrics serverMetrics
clk clock.Clock
}
const returnOverhead = 20 * time.Millisecond
const meaningfulWorkOverhead = 100 * time.Millisecond
func newServerInterceptor(metrics serverMetrics, clk clock.Clock) serverInterceptor {
return serverInterceptor{
metrics: metrics,
clk: clk,
}
}
func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if info == nil {
return nil, berrors.InternalServerError("passed nil *grpc.UnaryServerInfo")
}
// Extract the grpc metadata from the context. If the context has
// a `clientRequestTimeKey` field, and it has a value, then observe the RPC
// latency with Prometheus.
if md, ok := metadata.FromContext(ctx); ok && len(md[clientRequestTimeKey]) > 0 {
if err := si.observeLatency(md[clientRequestTimeKey][0]); err != nil {
return nil, err
}
}
if features.Enabled(features.RPCHeadroom) {
// Shave 20 milliseconds off the deadline to ensure that if the RPC server times
// out any sub-calls it makes (like DNS lookups, or onwards RPCs), it has a
@ -52,13 +75,33 @@ func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, inf
defer cancel()
}
resp, err := si.serverMetrics.UnaryServerInterceptor()(ctx, req, info, handler)
resp, err := si.metrics.grpcMetrics.UnaryServerInterceptor()(ctx, req, info, handler)
if err != nil {
err = wrapError(ctx, err)
}
return resp, err
}
// observeLatency is called with the `clientRequestTimeKey` value from
// a request's gRPC metadata. This string value is converted to a timestamp and
// used to calcuate the latency between send and receive time. The latency is
// published to the server interceptor's rpcLag prometheus histogram. An error
// is returned if the `clientReqTime` string is not a valid timestamp.
func (si *serverInterceptor) observeLatency(clientReqTime string) error {
// Convert the metadata request time into an int64
reqTimeUnixNanos, err := strconv.ParseInt(clientReqTime, 10, 64)
if err != nil {
return berrors.InternalServerError("grpc metadata had illegal %s value: %q - %s",
clientRequestTimeKey, clientReqTime, err)
}
// Calculate the elapsed time since the client sent the RPC
reqTime := time.Unix(0, reqTimeUnixNanos)
elapsed := si.clk.Since(reqTime)
// Publish an RPC latency observation to the histogram
si.metrics.rpcLag.Observe(elapsed.Seconds())
return nil
}
// clientInterceptor is a gRPC interceptor that adds Prometheus
// metrics to sent requests, and disables FailFast. We disable FailFast because
// non-FailFast mode is most similar to the old AMQP RPC layer: If a client
@ -69,6 +112,7 @@ func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, inf
type clientInterceptor struct {
timeout time.Duration
clientMetrics *grpc_prometheus.ClientMetrics
clk clock.Clock
}
// intercept fulfils the grpc.UnaryClientInterceptor interface, it should be noted that while this API
@ -86,12 +130,23 @@ func (ci *clientInterceptor) intercept(
// Disable fail-fast so RPCs will retry until deadline, even if all backends
// are down.
opts = append(opts, grpc.FailFast(false))
// Create grpc/metadata.Metadata to encode internal error type if one is returned
md := metadata.New(nil)
opts = append(opts, grpc.Trailer(&md))
// Convert the current unix nano timestamp to a string for embedding in the grpc metadata
nowTS := strconv.FormatInt(ci.clk.Now().UnixNano(), 10)
// Create a grpc/metadata.Metadata instance for the request metadata.
// Initialize it with the request time.
reqMD := metadata.New(map[string]string{clientRequestTimeKey: nowTS})
// Configure the localCtx with the metadata so it gets sent along in the request
localCtx = metadata.NewContext(localCtx, reqMD)
// Create a grpc/metadata.Metadata instance for a grpc.Trailer.
respMD := metadata.New(nil)
// Configure a grpc Trailer with respMD. This allows us to wrap error
// types in the server interceptor later on.
opts = append(opts, grpc.Trailer(&respMD))
err := ci.clientMetrics.UnaryClientInterceptor()(localCtx, method, req, reply, cc, invoker, opts...)
if err != nil {
err = unwrapError(err, md)
err = unwrapError(err, respMD)
}
return err
}

View File

@ -14,6 +14,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/grpc/test_proto"
@ -40,20 +41,27 @@ func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.Cli
}
func TestServerInterceptor(t *testing.T) {
si := serverInterceptor{grpc_prometheus.NewServerMetrics()}
serverMetrics := NewServerMetrics(metrics.NewNoopScope())
si := newServerInterceptor(serverMetrics, clock.NewFake())
md := metadata.New(map[string]string{clientRequestTimeKey: "0"})
ctxWithMetadata := metadata.NewContext(context.Background(), md)
_, err := si.intercept(context.Background(), nil, nil, testHandler)
test.AssertError(t, err, "si.intercept didn't fail with a context missing metadata")
_, err = si.intercept(ctxWithMetadata, nil, nil, testHandler)
test.AssertError(t, err, "si.intercept didn't fail with a nil grpc.UnaryServerInfo")
_, err = si.intercept(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "-service-test"}, testHandler)
_, err = si.intercept(ctxWithMetadata, nil, &grpc.UnaryServerInfo{FullMethod: "-service-test"}, testHandler)
test.AssertNotError(t, err, "si.intercept failed with a non-nil grpc.UnaryServerInfo")
_, err = si.intercept(context.Background(), 0, &grpc.UnaryServerInfo{FullMethod: "brokeTest"}, testHandler)
_, err = si.intercept(ctxWithMetadata, 0, &grpc.UnaryServerInfo{FullMethod: "brokeTest"}, testHandler)
test.AssertError(t, err, "si.intercept didn't fail when handler returned a error")
}
func TestClientInterceptor(t *testing.T) {
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics()}
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics(), clock.NewFake()}
err := ci.intercept(context.Background(), "-service-test", nil, nil, nil, testInvoker)
test.AssertNotError(t, err, "ci.intercept failed with a non-nil grpc.UnaryServerInfo")
@ -66,7 +74,7 @@ func TestClientInterceptor(t *testing.T) {
// timeout is reached, i.e. that FailFast is set to false.
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
func TestFailFastFalse(t *testing.T) {
ci := &clientInterceptor{100 * time.Millisecond, grpc_prometheus.NewClientMetrics()}
ci := &clientInterceptor{100 * time.Millisecond, grpc_prometheus.NewClientMetrics(), clock.NewFake()}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
grpc.WithInsecure(),
grpc.WithBalancer(grpc.RoundRobin(newStaticResolver([]string{"localhost:19000"}))),
@ -116,7 +124,8 @@ func TestTimeouts(t *testing.T) {
}
port := lis.Addr().(*net.TCPAddr).Port
si := &serverInterceptor{NewServerMetrics(metrics.NewNoopScope())}
serverMetrics := NewServerMetrics(metrics.NewNoopScope())
si := newServerInterceptor(serverMetrics, clock.NewFake())
s := grpc.NewServer(grpc.UnaryInterceptor(si.intercept))
test_proto.RegisterChillerServer(s, &testServer{})
go func() {
@ -129,7 +138,7 @@ func TestTimeouts(t *testing.T) {
defer s.Stop()
// make client
ci := &clientInterceptor{30 * time.Second, grpc_prometheus.NewClientMetrics()}
ci := &clientInterceptor{30 * time.Second, grpc_prometheus.NewClientMetrics(), clock.NewFake()}
conn, err := grpc.Dial(net.JoinHostPort("localhost", fmt.Sprintf("%d", port)),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(ci.intercept))
@ -161,3 +170,54 @@ func TestTimeouts(t *testing.T) {
})
}
}
func TestRequestTimeTagging(t *testing.T) {
clk := clock.NewFake()
// Listen for TCP requests on a random system assigned port number
lis, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Retrieve the concrete port numberthe system assigned our listener
port := lis.Addr().(*net.TCPAddr).Port
// Create a new ChillerServer
serverMetrics := NewServerMetrics(metrics.NewNoopScope())
si := newServerInterceptor(serverMetrics, clk)
s := grpc.NewServer(grpc.UnaryInterceptor(si.intercept))
test_proto.RegisterChillerServer(s, &testServer{})
// Chill until ill
go func() {
start := time.Now()
if err := s.Serve(lis); err != nil &&
!strings.HasSuffix(err.Error(), "use of closed network connection") {
t.Fatalf("s.Serve: %v after %s", err, time.Since(start))
}
}()
defer s.Stop()
// Dial the ChillerServer
ci := &clientInterceptor{30 * time.Second, grpc_prometheus.NewClientMetrics(), clk}
conn, err := grpc.Dial(net.JoinHostPort("localhost", fmt.Sprintf("%d", port)),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(ci.intercept))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
// Create a ChillerClient with the connection to the ChillerServer
c := test_proto.NewChillerClient(conn)
// Make an RPC request with the ChillerClient with a timeout higher than the
// requested ChillerServer delay so that the RPC completes normally
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var delayTime int64 = (time.Second * 5).Nanoseconds()
_, err = c.Chill(ctx, &test_proto.Time{Time: &delayTime})
if err != nil {
t.Fatal(fmt.Sprintf("Unexpected error calling Chill RPC: %s", err))
}
// There should be one histogram sample in the serverInterceptor rpcLag stat
count := test.CountHistogramSamples(si.metrics.rpcLag)
test.AssertEquals(t, count, 1)
}

View File

@ -6,6 +6,8 @@ import (
"net"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"github.com/letsencrypt/boulder/cmd"
@ -15,17 +17,13 @@ import (
// CodedError is a alias required to appease go vet
var CodedError = grpc.Errorf
var errNilMetrics = errors.New("boulder/grpc: received nil ServerMetrics")
var errNilTLS = errors.New("boulder/grpc: received nil tls.Config")
// NewServer creates a gRPC server that uses the provided *tls.Config, and
// verifies that clients present a certificate that (a) is signed by one of
// the configured ClientCAs, and (b) contains at least one
// subjectAlternativeName matching the accepted list from GRPCServerConfig.
func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, serverMetrics *grpc_prometheus.ServerMetrics) (*grpc.Server, net.Listener, error) {
if serverMetrics == nil {
return nil, nil, errNilMetrics
}
func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, metrics serverMetrics, clk clock.Clock) (*grpc.Server, net.Listener, error) {
if tls == nil {
return nil, nil, errNilTLS
}
@ -48,7 +46,7 @@ func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, serverMetrics *grpc_pro
if maxConcurrentStreams == 0 {
maxConcurrentStreams = 250
}
si := &serverInterceptor{serverMetrics}
si := newServerInterceptor(metrics, clk)
return grpc.NewServer(
grpc.Creds(creds),
grpc.UnaryInterceptor(si.intercept),
@ -56,12 +54,35 @@ func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, serverMetrics *grpc_pro
), l, nil
}
// NewServerMetrics constructs a *grpc_prometheus.ServerMetrics, registered with
// the given registry, with timing histogram enabled. It must be called a
// maximum of once per registry, or there will be conflicting names.
func NewServerMetrics(stats registry) *grpc_prometheus.ServerMetrics {
metrics := grpc_prometheus.NewServerMetrics()
metrics.EnableHandlingTimeHistogram()
stats.MustRegister(metrics)
return metrics
// serverMetrics is a struct type used to return a few registered metrics from
// `NewServerMetrics`
type serverMetrics struct {
grpcMetrics *grpc_prometheus.ServerMetrics
rpcLag prometheus.Histogram
}
// NewServerMetrics registers metrics with a registry. It must be called a
// maximum of once per registry, or there will be conflicting names.
// It constructs and registers a *grpc_prometheus.ServerMetrics with timing
// histogram enabled as well as a prometheus Histogram for RPC latency.
func NewServerMetrics(stats registry) serverMetrics {
// Create the grpc prometheus server metrics instance and register it
grpcMetrics := grpc_prometheus.NewServerMetrics()
grpcMetrics.EnableHandlingTimeHistogram()
stats.MustRegister(grpcMetrics)
// rpcLag is a prometheus histogram tracking the difference between the time
// the client sent an RPC and the time the server received it. Create and
// register it.
rpcLag := prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "rpc_lag",
Help: "Delta between client RPC send time and server RPC receipt time",
})
stats.MustRegister(rpcLag)
return serverMetrics{
grpcMetrics: grpcMetrics,
rpcLag: rpcLag,
}
}