Streamline gRPC client creation (#6472)
Remove the need for clients to explicitly call bgrpc.NewClientMetrics, by moving that call inside bgrpc.ClientSetup. In case ClientSetup is called multiple times, use the recommended method to gracefully recover from registering duplicate metrics. This makes gRPC client setup much more similar to gRPC server setup after the previous server refactoring change landed.
This commit is contained in:
parent
c791075e00
commit
0a02cdf7e3
|
|
@ -101,15 +101,14 @@ func newRevoker(c Config) *revoker {
|
|||
|
||||
clk := cmd.Clock()
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(metrics.NoopRegisterer)
|
||||
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tlsConfig, clientMetrics, clk)
|
||||
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tlsConfig, metrics.NoopRegisterer, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := rapb.NewRegistrationAuthorityClient(raConn)
|
||||
|
||||
dbMap, err := sa.InitWrappedDb(c.Revoker.DB, nil, logger)
|
||||
cmd.FailOnError(err, "While initializing dbMap")
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tlsConfig, clientMetrics, clk)
|
||||
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tlsConfig, metrics.NoopRegisterer, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(saConn)
|
||||
|
||||
|
|
|
|||
|
|
@ -450,8 +450,7 @@ func main() {
|
|||
tlsConfig, err := config.BadKeyRevoker.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
conn, err := bgrpc.ClientSetup(config.BadKeyRevoker.RAService, tlsConfig, clientMetrics, clk)
|
||||
conn, err := bgrpc.ClientSetup(config.BadKeyRevoker.RAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := rapb.NewRegistrationAuthorityClient(conn)
|
||||
|
||||
|
|
|
|||
|
|
@ -225,9 +225,8 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
|
||||
clk := cmd.Clock()
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.CA.SAService, tlsConfig, clientMetrics, clk)
|
||||
conn, err := bgrpc.ClientSetup(c.CA.SAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sa := sapb.NewStorageAuthorityClient(conn)
|
||||
|
||||
|
|
|
|||
|
|
@ -155,34 +155,33 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
|
||||
clk := cmd.Clock()
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
|
||||
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tlsConfig, clientMetrics, clk)
|
||||
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to create VA client")
|
||||
vac := vapb.NewVAClient(vaConn)
|
||||
caaClient := vapb.NewCAAClient(vaConn)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tlsConfig, clientMetrics, clk)
|
||||
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
cac := capb.NewCertificateAuthorityClient(caConn)
|
||||
|
||||
var ocspc capb.OCSPGeneratorClient
|
||||
ocspc = cac
|
||||
if c.RA.OCSPService != nil {
|
||||
ocspConn, err := bgrpc.ClientSetup(c.RA.OCSPService, tlsConfig, clientMetrics, clk)
|
||||
ocspConn, err := bgrpc.ClientSetup(c.RA.OCSPService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
ocspc = capb.NewOCSPGeneratorClient(ocspConn)
|
||||
}
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.RA.SAService, tlsConfig, clientMetrics, clk)
|
||||
saConn, err := bgrpc.ClientSetup(c.RA.SAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(saConn)
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, clientMetrics, clk)
|
||||
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to Publisher")
|
||||
pubc := pubpb.NewPublisherClient(conn)
|
||||
|
||||
apConn, err := bgrpc.ClientSetup(c.RA.AkamaiPurgerService, tlsConfig, clientMetrics, clk)
|
||||
apConn, err := bgrpc.ClientSetup(c.RA.AkamaiPurgerService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to create a Akamai Purger client")
|
||||
apc := akamaipb.NewAkamaiPurgerClient(apConn)
|
||||
|
||||
|
|
|
|||
|
|
@ -154,12 +154,11 @@ func main() {
|
|||
tlsConfig, err := c.VA.TLS.Load()
|
||||
cmd.FailOnError(err, "tlsConfig config")
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
var remotes []va.RemoteVA
|
||||
if len(c.VA.RemoteVAs) > 0 {
|
||||
for _, rva := range c.VA.RemoteVAs {
|
||||
rva := rva
|
||||
vaConn, err := bgrpc.ClientSetup(&rva, tlsConfig, clientMetrics, clk)
|
||||
vaConn, err := bgrpc.ClientSetup(&rva, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Unable to create remote VA client")
|
||||
remotes = append(
|
||||
remotes,
|
||||
|
|
|
|||
|
|
@ -276,27 +276,27 @@ func loadChain(certFiles []string) (*issuance.Certificate, []byte, error) {
|
|||
return certs[0], buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func setupWFE(c Config, stats prometheus.Registerer, clk clock.Clock) (rapb.RegistrationAuthorityClient, sapb.StorageAuthorityClient, noncepb.NonceServiceClient, map[string]noncepb.NonceServiceClient) {
|
||||
func setupWFE(c Config, scope prometheus.Registerer, clk clock.Clock) (rapb.RegistrationAuthorityClient, sapb.StorageAuthorityClient, noncepb.NonceServiceClient, map[string]noncepb.NonceServiceClient) {
|
||||
tlsConfig, err := c.WFE.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, clientMetrics, clk, bgrpc.CancelTo408Interceptor)
|
||||
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tlsConfig, scope, clk, bgrpc.CancelTo408Interceptor)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := rapb.NewRegistrationAuthorityClient(raConn)
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, clientMetrics, clk, bgrpc.CancelTo408Interceptor)
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tlsConfig, scope, clk, bgrpc.CancelTo408Interceptor)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(saConn)
|
||||
|
||||
var rns noncepb.NonceServiceClient
|
||||
npm := map[string]noncepb.NonceServiceClient{}
|
||||
if c.WFE.GetNonceService != nil {
|
||||
rnsConn, err := bgrpc.ClientSetup(c.WFE.GetNonceService, tlsConfig, clientMetrics, clk, bgrpc.CancelTo408Interceptor)
|
||||
rnsConn, err := bgrpc.ClientSetup(c.WFE.GetNonceService, tlsConfig, scope, clk, bgrpc.CancelTo408Interceptor)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to get nonce service")
|
||||
rns = noncepb.NewNonceServiceClient(rnsConn)
|
||||
for prefix, serviceConfig := range c.WFE.RedeemNonceServices {
|
||||
serviceConfig := serviceConfig
|
||||
conn, err := bgrpc.ClientSetup(&serviceConfig, tlsConfig, clientMetrics, clk, bgrpc.CancelTo408Interceptor)
|
||||
conn, err := bgrpc.ClientSetup(&serviceConfig, tlsConfig, scope, clk, bgrpc.CancelTo408Interceptor)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to redeem nonce service")
|
||||
npm[prefix] = noncepb.NewNonceServiceClient(conn)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -139,17 +139,15 @@ func main() {
|
|||
c.CRLUpdater.LookbackPeriod.Duration = 24 * time.Hour
|
||||
}
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.CRLUpdater.SAService, tlsConfig, clientMetrics, clk)
|
||||
saConn, err := bgrpc.ClientSetup(c.CRLUpdater.SAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(saConn)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(c.CRLUpdater.CRLGeneratorService, tlsConfig, clientMetrics, clk)
|
||||
caConn, err := bgrpc.ClientSetup(c.CRLUpdater.CRLGeneratorService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CRLGenerator")
|
||||
cac := capb.NewCRLGeneratorClient(caConn)
|
||||
|
||||
csConn, err := bgrpc.ClientSetup(c.CRLUpdater.CRLStorerService, tlsConfig, clientMetrics, clk)
|
||||
csConn, err := bgrpc.ClientSetup(c.CRLUpdater.CRLStorerService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CRLStorer")
|
||||
csc := cspb.NewCRLStorerClient(csConn)
|
||||
|
||||
|
|
|
|||
|
|
@ -664,8 +664,7 @@ func main() {
|
|||
|
||||
clk := cmd.Clock()
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tlsConfig, clientMetrics, clk)
|
||||
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(conn)
|
||||
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ as generated by Boulder's ceremony command.
|
|||
beeline.Init(bc)
|
||||
defer beeline.Close()
|
||||
|
||||
stats, logger := cmd.StatsAndLogging(c.Syslog, c.OCSPResponder.DebugAddr)
|
||||
scope, logger := cmd.StatsAndLogging(c.Syslog, c.OCSPResponder.DebugAddr)
|
||||
defer logger.AuditPanic()
|
||||
logger.Info(cmd.VersionString())
|
||||
|
||||
|
|
@ -163,11 +163,11 @@ as generated by Boulder's ceremony command.
|
|||
// Set DB.DBConnect as a fallback if DB.DBConnectFile isn't present.
|
||||
config.DB.DBConnect = config.Source
|
||||
|
||||
dbMap, err := sa.InitWrappedDb(config.DB, stats, logger)
|
||||
dbMap, err := sa.InitWrappedDb(config.DB, scope, logger)
|
||||
cmd.FailOnError(err, "While initializing dbMap")
|
||||
|
||||
// Set up the redis source and the combined multiplex source.
|
||||
rocspReader, err := rocsp_config.MakeClient(&c.OCSPResponder.Redis, clk, stats)
|
||||
rocspReader, err := rocsp_config.MakeClient(&c.OCSPResponder.Redis, clk, scope)
|
||||
cmd.FailOnError(err, "Could not make redis client")
|
||||
|
||||
err = rocspReader.Ping(context.Background())
|
||||
|
|
@ -180,8 +180,8 @@ as generated by Boulder's ceremony command.
|
|||
|
||||
tlsConfig, err := c.OCSPResponder.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
raConn, err := bgrpc.ClientSetup(c.OCSPResponder.RAService, tlsConfig, clientMetrics, clk)
|
||||
|
||||
raConn, err := bgrpc.ClientSetup(c.OCSPResponder.RAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := rapb.NewRegistrationAuthorityClient(raConn)
|
||||
|
||||
|
|
@ -191,16 +191,16 @@ as generated by Boulder's ceremony command.
|
|||
}
|
||||
liveSource := live.New(rac, int64(maxInflight))
|
||||
|
||||
rocspSource, err := redis_responder.NewRedisSource(rocspReader, liveSource, liveSigningPeriod, clk, stats, logger)
|
||||
rocspSource, err := redis_responder.NewRedisSource(rocspReader, liveSource, liveSigningPeriod, clk, scope, logger)
|
||||
cmd.FailOnError(err, "Could not create redis source")
|
||||
|
||||
var sac sapb.StorageAuthorityClient
|
||||
if c.OCSPResponder.SAService != nil {
|
||||
saConn, err := bgrpc.ClientSetup(c.OCSPResponder.SAService, tlsConfig, clientMetrics, clk)
|
||||
saConn, err := bgrpc.ClientSetup(c.OCSPResponder.SAService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac = sapb.NewStorageAuthorityClient(saConn)
|
||||
}
|
||||
source, err = redis_responder.NewCheckedRedisSource(rocspSource, dbMap, sac, stats, logger)
|
||||
source, err = redis_responder.NewCheckedRedisSource(rocspSource, dbMap, sac, scope, logger)
|
||||
cmd.FailOnError(err, "Could not create checkedRedis source")
|
||||
|
||||
// Load the certificate from the file path.
|
||||
|
|
@ -215,14 +215,14 @@ as generated by Boulder's ceremony command.
|
|||
issuerCerts,
|
||||
c.OCSPResponder.RequiredSerialPrefixes,
|
||||
source,
|
||||
stats,
|
||||
scope,
|
||||
logger,
|
||||
clk,
|
||||
)
|
||||
cmd.FailOnError(err, "Could not create filtered source")
|
||||
}
|
||||
|
||||
m := mux(c.OCSPResponder.Path, source, c.OCSPResponder.Timeout.Duration, stats, logger, c.OCSPResponder.LogSampleRate)
|
||||
m := mux(c.OCSPResponder.Path, source, c.OCSPResponder.Timeout.Duration, scope, logger, c.OCSPResponder.LogSampleRate)
|
||||
|
||||
// The gosec linter complains that ReadHeaderTimeout is not set. That's fine,
|
||||
// because that field inherits its value from ReadTimeout.
|
||||
|
|
|
|||
|
|
@ -85,11 +85,11 @@ func main() {
|
|||
beeline.Init(bc)
|
||||
defer beeline.Close()
|
||||
|
||||
stats, logger := cmd.StatsAndLogging(c.Syslog, conf.DebugAddr)
|
||||
scope, logger := cmd.StatsAndLogging(c.Syslog, conf.DebugAddr)
|
||||
defer logger.AuditPanic()
|
||||
logger.Info(cmd.VersionString())
|
||||
|
||||
readWriteDb, err := sa.InitWrappedDb(conf.DB, stats, logger)
|
||||
readWriteDb, err := sa.InitWrappedDb(conf.DB, scope, logger)
|
||||
cmd.FailOnError(err, "Failed to initialize database client")
|
||||
|
||||
var readOnlyDb *db.WrappedMap
|
||||
|
|
@ -97,7 +97,7 @@ func main() {
|
|||
if readOnlyDbDSN == "" {
|
||||
readOnlyDb = readWriteDb
|
||||
} else {
|
||||
readOnlyDb, err = sa.InitWrappedDb(conf.ReadOnlyDB, stats, logger)
|
||||
readOnlyDb, err = sa.InitWrappedDb(conf.ReadOnlyDB, scope, logger)
|
||||
cmd.FailOnError(err, "Failed to initialize read-only database client")
|
||||
}
|
||||
|
||||
|
|
@ -105,8 +105,8 @@ func main() {
|
|||
|
||||
tlsConfig, err := c.OCSPUpdater.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
caConn, err := bgrpc.ClientSetup(c.OCSPUpdater.OCSPGeneratorService, tlsConfig, clientMetrics, clk)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(c.OCSPUpdater.OCSPGeneratorService, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CA")
|
||||
ogc := capb.NewOCSPGeneratorClient(caConn)
|
||||
|
||||
|
|
@ -116,7 +116,7 @@ func main() {
|
|||
}
|
||||
|
||||
updater, err := ocsp_updater.New(
|
||||
stats,
|
||||
scope,
|
||||
clk,
|
||||
readWriteDb,
|
||||
readOnlyDb,
|
||||
|
|
|
|||
|
|
@ -209,12 +209,11 @@ func newOrphanFinder(configFile string) *orphanFinder {
|
|||
tlsConfig, err := conf.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(metrics.NoopRegisterer)
|
||||
saConn, err := bgrpc.ClientSetup(conf.SAService, tlsConfig, clientMetrics, cmd.Clock())
|
||||
saConn, err := bgrpc.ClientSetup(conf.SAService, tlsConfig, metrics.NoopRegisterer, cmd.Clock())
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := sapb.NewStorageAuthorityClient(saConn)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(conf.OCSPGeneratorService, tlsConfig, clientMetrics, cmd.Clock())
|
||||
caConn, err := bgrpc.ClientSetup(conf.OCSPGeneratorService, tlsConfig, metrics.NoopRegisterer, cmd.Clock())
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CA")
|
||||
cac := capb.NewOCSPGeneratorClient(caConn)
|
||||
|
||||
|
|
|
|||
|
|
@ -242,13 +242,13 @@ func helpExit() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
func configureOCSPGenerator(tlsConf cmd.TLSConfig, grpcConf cmd.GRPCClientConfig, clk clock.Clock, stats prometheus.Registerer) (capb.OCSPGeneratorClient, error) {
|
||||
func configureOCSPGenerator(tlsConf cmd.TLSConfig, grpcConf cmd.GRPCClientConfig, clk clock.Clock, scope prometheus.Registerer) (capb.OCSPGeneratorClient, error) {
|
||||
tlsConfig, err := tlsConf.Load()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading TLS config: %w", err)
|
||||
}
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
caConn, err := bgrpc.ClientSetup(&grpcConf, tlsConfig, clientMetrics, clk)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(&grpcConf, tlsConfig, scope, clk)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CA")
|
||||
return capb.NewOCSPGeneratorClient(caConn), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import (
|
|||
// a client certificate and validates the the server certificate based
|
||||
// on the provided *tls.Config.
|
||||
// It dials the remote service and returns a grpc.ClientConn if successful.
|
||||
func ClientSetup(c *cmd.GRPCClientConfig, tlsConfig *tls.Config, metrics clientMetrics, clk clock.Clock, interceptors ...grpc.UnaryClientInterceptor) (*grpc.ClientConn, error) {
|
||||
func ClientSetup(c *cmd.GRPCClientConfig, tlsConfig *tls.Config, statsRegistry prometheus.Registerer, clk clock.Clock, interceptors ...grpc.UnaryClientInterceptor) (*grpc.ClientConn, error) {
|
||||
if c == nil {
|
||||
return nil, errors.New("nil gRPC client config provided. JSON config is probably missing a fooService section.")
|
||||
}
|
||||
|
|
@ -34,6 +34,11 @@ func ClientSetup(c *cmd.GRPCClientConfig, tlsConfig *tls.Config, metrics clientM
|
|||
return nil, errNilTLS
|
||||
}
|
||||
|
||||
metrics, err := newClientMetrics(statsRegistry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ci := clientInterceptor{c.Timeout.Duration, metrics, clk}
|
||||
|
||||
unaryInterceptors := append(interceptors, []grpc.UnaryClientInterceptor{
|
||||
|
|
@ -72,24 +77,40 @@ type clientMetrics struct {
|
|||
inFlightRPCs *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
// NewClientMetrics constructs a *grpc_prometheus.ClientMetrics, registered with
|
||||
// newClientMetrics constructs a *grpc_prometheus.ClientMetrics, registered with
|
||||
// the given registry, with timing histogram enabled. It must be called a
|
||||
// maximum of once per registry, or there will be conflicting names.
|
||||
func NewClientMetrics(stats prometheus.Registerer) clientMetrics {
|
||||
func newClientMetrics(stats prometheus.Registerer) (clientMetrics, error) {
|
||||
// Create the grpc prometheus client metrics instance and register it
|
||||
grpcMetrics := grpc_prometheus.NewClientMetrics()
|
||||
grpcMetrics.EnableClientHandlingTimeHistogram()
|
||||
stats.MustRegister(grpcMetrics)
|
||||
err := stats.Register(grpcMetrics)
|
||||
if err != nil {
|
||||
are := prometheus.AlreadyRegisteredError{}
|
||||
if errors.As(err, &are) {
|
||||
grpcMetrics = are.ExistingCollector.(*grpc_prometheus.ClientMetrics)
|
||||
} else {
|
||||
return clientMetrics{}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create a gauge to track in-flight RPCs and register it.
|
||||
inFlightGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "grpc_in_flight",
|
||||
Help: "Number of in-flight (sent, not yet completed) RPCs",
|
||||
}, []string{"method", "service"})
|
||||
stats.MustRegister(inFlightGauge)
|
||||
err = stats.Register(inFlightGauge)
|
||||
if err != nil {
|
||||
are := prometheus.AlreadyRegisteredError{}
|
||||
if errors.As(err, &are) {
|
||||
inFlightGauge = are.ExistingCollector.(*prometheus.GaugeVec)
|
||||
} else {
|
||||
return clientMetrics{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return clientMetrics{
|
||||
grpcMetrics: grpcMetrics,
|
||||
inFlightRPCs: inFlightGauge,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/jmhodges/clock"
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
"google.golang.org/grpc"
|
||||
_ "google.golang.org/grpc/health"
|
||||
|
|
@ -29,7 +30,7 @@ func TestClientSetup(t *testing.T) {
|
|||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
client, err := ClientSetup(tt.cfg, &tls.Config{}, clientMetrics{}, clock.NewFake(), []grpc.UnaryClientInterceptor{}...)
|
||||
client, err := ClientSetup(tt.cfg, &tls.Config{}, metrics.NoopRegisterer, clock.NewFake(), []grpc.UnaryClientInterceptor{}...)
|
||||
if tt.wantErr {
|
||||
test.AssertError(t, err, "expected error, got nil")
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,9 @@ func TestErrorWrapping(t *testing.T) {
|
|||
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating server metrics")
|
||||
si := newServerInterceptor(serverMetrics, clock.NewFake())
|
||||
ci := clientInterceptor{time.Second, NewClientMetrics(metrics.NoopRegisterer), clock.NewFake()}
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := clientInterceptor{time.Second, clientMetrics, clock.NewFake()}
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(si.interceptUnary))
|
||||
es := &errorServer{}
|
||||
test_proto.RegisterChillerServer(srv, es)
|
||||
|
|
@ -73,7 +75,9 @@ func TestSubErrorWrapping(t *testing.T) {
|
|||
serverMetrics, err := newServerMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating server metrics")
|
||||
si := newServerInterceptor(serverMetrics, clock.NewFake())
|
||||
ci := clientInterceptor{time.Second, NewClientMetrics(metrics.NoopRegisterer), clock.NewFake()}
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := clientInterceptor{time.Second, clientMetrics, clock.NewFake()}
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(si.interceptUnary))
|
||||
es := &errorServer{}
|
||||
test_proto.RegisterChillerServer(srv, es)
|
||||
|
|
|
|||
|
|
@ -71,12 +71,15 @@ func TestServerInterceptor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClientInterceptor(t *testing.T) {
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := clientInterceptor{
|
||||
timeout: time.Second,
|
||||
metrics: NewClientMetrics(metrics.NoopRegisterer),
|
||||
metrics: clientMetrics,
|
||||
clk: clock.NewFake(),
|
||||
}
|
||||
err := ci.interceptUnary(context.Background(), "-service-test", nil, nil, nil, testInvoker)
|
||||
|
||||
err = ci.interceptUnary(context.Background(), "-service-test", nil, nil, nil, testInvoker)
|
||||
test.AssertNotError(t, err, "ci.intercept failed with a non-nil grpc.UnaryServerInfo")
|
||||
|
||||
err = ci.interceptUnary(context.Background(), "-service-brokeTest", nil, nil, nil, testInvoker)
|
||||
|
|
@ -101,9 +104,11 @@ func TestCancelTo408Interceptor(t *testing.T) {
|
|||
// timeout is reached, i.e. that FailFast is set to false.
|
||||
// https://github.com/grpc/grpc/blob/main/doc/wait-for-ready.md
|
||||
func TestFailFastFalse(t *testing.T) {
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := &clientInterceptor{
|
||||
timeout: 100 * time.Millisecond,
|
||||
metrics: NewClientMetrics(metrics.NoopRegisterer),
|
||||
metrics: clientMetrics,
|
||||
clk: clock.NewFake(),
|
||||
}
|
||||
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
|
||||
|
|
@ -169,9 +174,11 @@ func TestTimeouts(t *testing.T) {
|
|||
defer s.Stop()
|
||||
|
||||
// make client
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := &clientInterceptor{
|
||||
timeout: 30 * time.Second,
|
||||
metrics: NewClientMetrics(metrics.NoopRegisterer),
|
||||
metrics: clientMetrics,
|
||||
clk: clock.NewFake(),
|
||||
}
|
||||
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
|
||||
|
|
@ -232,9 +239,11 @@ func TestRequestTimeTagging(t *testing.T) {
|
|||
defer s.Stop()
|
||||
|
||||
// Dial the ChillerServer
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := &clientInterceptor{
|
||||
timeout: 30 * time.Second,
|
||||
metrics: NewClientMetrics(metrics.NoopRegisterer),
|
||||
metrics: clientMetrics,
|
||||
clk: clk,
|
||||
}
|
||||
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
|
||||
|
|
@ -319,9 +328,11 @@ func TestInFlightRPCStat(t *testing.T) {
|
|||
defer s.Stop()
|
||||
|
||||
// Dial the ChillerServer
|
||||
clientMetrics, err := newClientMetrics(metrics.NoopRegisterer)
|
||||
test.AssertNotError(t, err, "creating client metrics")
|
||||
ci := &clientInterceptor{
|
||||
timeout: 30 * time.Second,
|
||||
metrics: NewClientMetrics(metrics.NoopRegisterer),
|
||||
metrics: clientMetrics,
|
||||
clk: clk,
|
||||
}
|
||||
conn, err := grpc.Dial(net.JoinHostPort("localhost", strconv.Itoa(port)),
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ func main() {
|
|||
cmd.FailOnError(err, "failed to load TLS credentials")
|
||||
|
||||
// GRPC connection prerequisites.
|
||||
clientMetrics := bgrpc.NewClientMetrics(metrics.NoopRegisterer)
|
||||
clk := cmd.Clock()
|
||||
|
||||
// Health check retry and timeout.
|
||||
|
|
@ -65,7 +64,7 @@ func main() {
|
|||
c.GRPC.HostOverride = strings.Replace(hostOverride, ".service.consul", ".boulder", 1)
|
||||
|
||||
// Set up the GRPC connection.
|
||||
conn, err := bgrpc.ClientSetup(c.GRPC, tlsConfig, clientMetrics, clk)
|
||||
conn, err := bgrpc.ClientSetup(c.GRPC, tlsConfig, metrics.NoopRegisterer, clk)
|
||||
cmd.FailOnError(err, "failed to connect to service")
|
||||
client := healthpb.NewHealthClient(conn)
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, c.GRPC.Timeout.Duration)
|
||||
|
|
|
|||
Loading…
Reference in New Issue