Restore gRPC metrics (#3265)
The go-grpc-prometheus package by default registers its metrics with Prometheus' global registry. In #3167, when we stopped using the global registry, we accidentally lost our gRPC metrics. This change adds them back. Specifically, it adds two convenience functions, one for clients and one for servers, that makes the necessary metrics object and registers it. We run these in the main function of each server. I considered adding these as part of StatsAndLogging, but the corresponding ClientMetrics and ServerMetrics objects (defined by go-grpc-prometheus) need to be subsequently made available during construction of the gRPC clients and servers. We could add them as fields on Scope, but this seemed like a little too much tight coupling. Also, update go-grpc-prometheus to get the necessary methods. ``` $ go test github.com/grpc-ecosystem/go-grpc-prometheus/... ok github.com/grpc-ecosystem/go-grpc-prometheus 0.069s ? github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto [no test files] ```
This commit is contained in:
parent
cda7b25c23
commit
68d5cc3331
|
|
@ -162,8 +162,8 @@
|
|||
},
|
||||
{
|
||||
"ImportPath": "github.com/grpc-ecosystem/go-grpc-prometheus",
|
||||
"Comment": "v1.1-1-g34abd90",
|
||||
"Rev": "34abd90a014618f61222a1b0a7b7eb834a2d0dc3"
|
||||
"Comment": "v1.1-8-g0dafe0d",
|
||||
"Rev": "0dafe0d496ea71181bf2dd039e7e3f44b6bd11a7"
|
||||
},
|
||||
{
|
||||
"ImportPath": "github.com/jmhodges/clock",
|
||||
|
|
|
|||
|
|
@ -70,7 +70,8 @@ func setupContext(c config) (core.RegistrationAuthority, blog.Logger, *gorp.DbMa
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tls, metrics.NewNoopScope())
|
||||
clientMetrics := bgrpc.NewClientMetrics(metrics.NewNoopScope())
|
||||
raConn, err := bgrpc.ClientSetup(c.Revoker.RAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
|
||||
|
||||
|
|
@ -79,7 +80,7 @@ func setupContext(c config) (core.RegistrationAuthority, blog.Logger, *gorp.DbMa
|
|||
dbMap, err := sa.NewDbMap(dbURL, c.Revoker.DBConfig.MaxDBConns)
|
||||
cmd.FailOnError(err, "Couldn't setup database connection")
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tls, metrics.NewNoopScope())
|
||||
saConn, err := bgrpc.ClientSetup(c.Revoker.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
|
||||
|
||||
|
|
|
|||
|
|
@ -162,7 +162,8 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.CA.SAService, tls, scope)
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
conn, err := bgrpc.ClientSetup(c.CA.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sa := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
|
||||
|
|
@ -177,9 +178,10 @@ func main() {
|
|||
logger)
|
||||
cmd.FailOnError(err, "Failed to create CA impl")
|
||||
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
var caSrv *grpc.Server
|
||||
if c.CA.GRPCCA != nil {
|
||||
s, l, err := bgrpc.NewServer(c.CA.GRPCCA, tls, scope)
|
||||
s, l, err := bgrpc.NewServer(c.CA.GRPCCA, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup CA gRPC server")
|
||||
caWrapper := bgrpc.NewCertificateAuthorityServer(cai)
|
||||
caPB.RegisterCertificateAuthorityServer(s, caWrapper)
|
||||
|
|
@ -191,7 +193,7 @@ func main() {
|
|||
}
|
||||
var ocspSrv *grpc.Server
|
||||
if c.CA.GRPCOCSPGenerator != nil {
|
||||
s, l, err := bgrpc.NewServer(c.CA.GRPCOCSPGenerator, tls, scope)
|
||||
s, l, err := bgrpc.NewServer(c.CA.GRPCOCSPGenerator, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup CA gRPC server")
|
||||
caWrapper := bgrpc.NewCertificateAuthorityServer(cai)
|
||||
caPB.RegisterOCSPGeneratorServer(s, caWrapper)
|
||||
|
|
|
|||
|
|
@ -75,7 +75,8 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.Publisher.SAService, tls, scope)
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
conn, err := bgrpc.ClientSetup(c.Publisher.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
|
||||
|
|
@ -88,7 +89,8 @@ func main() {
|
|||
|
||||
var grpcSrv *grpc.Server
|
||||
if c.Publisher.GRPC != nil {
|
||||
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, tls, scope)
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup Publisher gRPC server")
|
||||
gw := bgrpc.NewPublisherServerWrapper(pubi)
|
||||
pubPB.RegisterPublisherServer(s, gw)
|
||||
|
|
|
|||
|
|
@ -125,13 +125,14 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tls, scope)
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
vaConn, err := bgrpc.ClientSetup(c.RA.VAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Unable to create VA client")
|
||||
vac := bgrpc.NewValidationAuthorityGRPCClient(vaConn)
|
||||
|
||||
caaClient := vaPB.NewCAAClient(vaConn)
|
||||
|
||||
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tls, scope)
|
||||
caConn, err := bgrpc.ClientSetup(c.RA.CAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Unable to create CA client")
|
||||
// Build a CA client that is only capable of issuing certificates, not
|
||||
// signing OCSP. TODO(jsha): Once we've fully moved to gRPC, replace this
|
||||
|
|
@ -140,12 +141,12 @@ func main() {
|
|||
|
||||
var pubc core.Publisher
|
||||
if c.RA.PublisherService != nil {
|
||||
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tls, scope)
|
||||
conn, err := bgrpc.ClientSetup(c.RA.PublisherService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to Publisher")
|
||||
pubc = bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(conn))
|
||||
}
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.RA.SAService, tls, scope)
|
||||
conn, err := bgrpc.ClientSetup(c.RA.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
|
||||
|
|
@ -215,8 +216,9 @@ func main() {
|
|||
|
||||
var grpcSrv *grpc.Server
|
||||
if c.RA.GRPC != nil {
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
var listener net.Listener
|
||||
grpcSrv, listener, err = bgrpc.NewServer(c.RA.GRPC, tls, scope)
|
||||
grpcSrv, listener, err = bgrpc.NewServer(c.RA.GRPC, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup RA gRPC server")
|
||||
gw := bgrpc.NewRegistrationAuthorityServer(rai)
|
||||
rapb.RegisterRegistrationAuthorityServer(grpcSrv, gw)
|
||||
|
|
|
|||
|
|
@ -72,7 +72,8 @@ func main() {
|
|||
tls, err := c.SA.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
var listener net.Listener
|
||||
grpcSrv, listener, err = bgrpc.NewServer(c.SA.GRPC, tls, scope)
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
grpcSrv, listener, err = bgrpc.NewServer(c.SA.GRPC, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup SA gRPC server")
|
||||
gw := bgrpc.NewStorageAuthorityServer(sai)
|
||||
sapb.RegisterStorageAuthorityServer(grpcSrv, gw)
|
||||
|
|
|
|||
|
|
@ -109,10 +109,11 @@ func main() {
|
|||
tls, err := c.VA.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
var remotes []va.RemoteVA
|
||||
if len(c.VA.RemoteVAs) > 0 {
|
||||
for _, rva := range c.VA.RemoteVAs {
|
||||
vaConn, err := bgrpc.ClientSetup(&rva, tls, scope)
|
||||
vaConn, err := bgrpc.ClientSetup(&rva, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Unable to create remote VA client")
|
||||
remotes = append(
|
||||
remotes,
|
||||
|
|
@ -136,7 +137,8 @@ func main() {
|
|||
clk,
|
||||
logger)
|
||||
|
||||
grpcSrv, l, err := bgrpc.NewServer(c.VA.GRPC, tls, scope)
|
||||
serverMetrics := bgrpc.NewServerMetrics(scope)
|
||||
grpcSrv, l, err := bgrpc.NewServer(c.VA.GRPC, tls, serverMetrics)
|
||||
cmd.FailOnError(err, "Unable to setup VA gRPC server")
|
||||
err = bgrpc.RegisterValidationAuthorityGRPCServer(grpcSrv, vai)
|
||||
cmd.FailOnError(err, "Unable to register VA gRPC server")
|
||||
|
|
|
|||
|
|
@ -70,11 +70,12 @@ func setupWFE(c config, logger blog.Logger, stats metrics.Scope) (core.Registrat
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tls, stats)
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tls, stats)
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
|
||||
|
||||
|
|
|
|||
|
|
@ -69,12 +69,12 @@ func setupWFE(c config, logger blog.Logger, stats metrics.Scope) (core.Registrat
|
|||
tls, err = c.WFE.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tls, stats)
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
raConn, err := bgrpc.ClientSetup(c.WFE.RAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to RA")
|
||||
rac := bgrpc.NewRegistrationAuthorityClient(rapb.NewRegistrationAuthorityClient(raConn))
|
||||
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tls, stats)
|
||||
saConn, err := bgrpc.ClientSetup(c.WFE.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(saConn))
|
||||
|
||||
|
|
|
|||
|
|
@ -485,7 +485,8 @@ func main() {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tls, scope)
|
||||
clientMetrics := bgrpc.NewClientMetrics(scope)
|
||||
conn, err := bgrpc.ClientSetup(c.Mailer.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
|
||||
|
|
|
|||
|
|
@ -723,18 +723,19 @@ func setupClients(c cmd.OCSPUpdaterConfig, stats metrics.Scope) (
|
|||
tls, err = c.TLS.Load()
|
||||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
caConn, err := bgrpc.ClientSetup(c.OCSPGeneratorService, tls, stats)
|
||||
clientMetrics := bgrpc.NewClientMetrics(stats)
|
||||
caConn, err := bgrpc.ClientSetup(c.OCSPGeneratorService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to CA")
|
||||
// Make a CA client that is only capable of signing OCSP.
|
||||
// TODO(jsha): Once we've fully moved to gRPC, replace this
|
||||
// with a plain caPB.NewOCSPGeneratorClient.
|
||||
cac := bgrpc.NewCertificateAuthorityClient(nil, capb.NewOCSPGeneratorClient(caConn))
|
||||
|
||||
publisherConn, err := bgrpc.ClientSetup(c.Publisher, tls, stats)
|
||||
publisherConn, err := bgrpc.ClientSetup(c.Publisher, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create connection to service")
|
||||
pubc := bgrpc.NewPublisherClientWrapper(pubPB.NewPublisherClient(publisherConn))
|
||||
|
||||
conn, err := bgrpc.ClientSetup(c.SAService, tls, stats)
|
||||
conn, err := bgrpc.ClientSetup(c.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
|
||||
|
|
|
|||
|
|
@ -133,7 +133,8 @@ func setup(configFile string) (blog.Logger, core.StorageAuthority) {
|
|||
cmd.FailOnError(err, "TLS config")
|
||||
}
|
||||
|
||||
conn, err := bgrpc.ClientSetup(conf.SAService, tls, metrics.NewNoopScope())
|
||||
clientMetrics := bgrpc.NewClientMetrics(metrics.NewNoopScope())
|
||||
conn, err := bgrpc.ClientSetup(conf.SAService, tls, clientMetrics)
|
||||
cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA")
|
||||
sac := bgrpc.NewStorageAuthorityClient(sapb.NewStorageAuthorityClient(conn))
|
||||
return logger, sac
|
||||
|
|
|
|||
|
|
@ -5,31 +5,26 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
bcreds "github.com/letsencrypt/boulder/grpc/creds"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
)
|
||||
|
||||
// ClientSetup creates a gRPC TransportCredentials that presents
|
||||
// a client certificate and validates the the server certificate based
|
||||
// on the provided *tls.Config.
|
||||
// It dials the remote service and returns a grpc.ClientConn if successful.
|
||||
func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, stats metrics.Scope) (*grpc.ClientConn, error) {
|
||||
func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, clientMetrics *grpc_prometheus.ClientMetrics) (*grpc.ClientConn, error) {
|
||||
if len(c.ServerAddresses) == 0 {
|
||||
return nil, fmt.Errorf("boulder/grpc: ServerAddresses is empty")
|
||||
}
|
||||
if stats == nil {
|
||||
return nil, errNilScope
|
||||
}
|
||||
if tls == nil {
|
||||
return nil, errNilTLS
|
||||
}
|
||||
|
||||
grpc_prometheus.EnableClientHandlingTimeHistogram()
|
||||
|
||||
ci := clientInterceptor{c.Timeout.Duration}
|
||||
ci := clientInterceptor{c.Timeout.Duration, clientMetrics}
|
||||
creds := bcreds.NewClientCredentials(tls.RootCAs, tls.Certificates)
|
||||
return grpc.Dial(
|
||||
"", // Since our staticResolver provides addresses we don't need to pass an address here
|
||||
|
|
@ -38,3 +33,17 @@ func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, stats metrics.Scope)
|
|||
grpc.WithUnaryInterceptor(ci.intercept),
|
||||
)
|
||||
}
|
||||
|
||||
type registry interface {
|
||||
MustRegister(...prometheus.Collector)
|
||||
}
|
||||
|
||||
// NewClientMetrics constructs a *grpc_prometheus.ClientMetrics, registered with
|
||||
// the given registry, with timing histogram enabled. It must be called a
|
||||
// maximum of once per registry, or there will be conflicting names.
|
||||
func NewClientMetrics(stats registry) *grpc_prometheus.ClientMetrics {
|
||||
metrics := grpc_prometheus.NewClientMetrics()
|
||||
metrics.EnableClientHandlingTimeHistogram()
|
||||
stats.MustRegister(metrics)
|
||||
return metrics
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
testproto "github.com/letsencrypt/boulder/grpc/test_proto"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
|
|
@ -23,8 +24,8 @@ func (s *errorServer) Chill(_ context.Context, _ *testproto.Time) (*testproto.Ti
|
|||
}
|
||||
|
||||
func TestErrorWrapping(t *testing.T) {
|
||||
si := serverInterceptor{}
|
||||
ci := clientInterceptor{time.Second}
|
||||
si := serverInterceptor{grpc_prometheus.NewServerMetrics()}
|
||||
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics()}
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(si.intercept))
|
||||
es := &errorServer{}
|
||||
testproto.RegisterChillerServer(srv, es)
|
||||
|
|
|
|||
|
|
@ -14,13 +14,15 @@ import (
|
|||
// serverInterceptor is a gRPC interceptor that adds Prometheus
|
||||
// metrics to requests handled by a gRPC server, and wraps Boulder-specific
|
||||
// errors for transmission in a grpc/metadata trailer (see bcodes.go).
|
||||
type serverInterceptor struct{}
|
||||
type serverInterceptor struct {
|
||||
serverMetrics *grpc_prometheus.ServerMetrics
|
||||
}
|
||||
|
||||
func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if info == nil {
|
||||
return nil, berrors.InternalServerError("passed nil *grpc.UnaryServerInfo")
|
||||
}
|
||||
resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||
resp, err := si.serverMetrics.UnaryServerInterceptor()(ctx, req, info, handler)
|
||||
if err != nil {
|
||||
err = wrapError(ctx, err)
|
||||
}
|
||||
|
|
@ -35,7 +37,8 @@ func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, inf
|
|||
// comes back up within the timeout. Under gRPC the same effect is achieved by
|
||||
// retries up to the Context deadline.
|
||||
type clientInterceptor struct {
|
||||
timeout time.Duration
|
||||
timeout time.Duration
|
||||
clientMetrics *grpc_prometheus.ClientMetrics
|
||||
}
|
||||
|
||||
// intercept fulfils the grpc.UnaryClientInterceptor interface, it should be noted that while this API
|
||||
|
|
@ -56,7 +59,7 @@ func (ci *clientInterceptor) intercept(
|
|||
// Create grpc/metadata.Metadata to encode internal error type if one is returned
|
||||
md := metadata.New(nil)
|
||||
opts = append(opts, grpc.Trailer(&md))
|
||||
err := grpc_prometheus.UnaryClientInterceptor(localCtx, method, req, reply, cc, invoker, opts...)
|
||||
err := ci.clientMetrics.UnaryClientInterceptor()(localCtx, method, req, reply, cc, invoker, opts...)
|
||||
if err != nil {
|
||||
err = unwrapError(err, md)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
|
@ -32,7 +33,7 @@ func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.Cli
|
|||
}
|
||||
|
||||
func TestServerInterceptor(t *testing.T) {
|
||||
si := serverInterceptor{}
|
||||
si := serverInterceptor{grpc_prometheus.NewServerMetrics()}
|
||||
|
||||
_, err := si.intercept(context.Background(), nil, nil, testHandler)
|
||||
test.AssertError(t, err, "si.intercept didn't fail with a nil grpc.UnaryServerInfo")
|
||||
|
|
@ -45,7 +46,7 @@ func TestServerInterceptor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestClientInterceptor(t *testing.T) {
|
||||
ci := clientInterceptor{time.Second}
|
||||
ci := clientInterceptor{time.Second, grpc_prometheus.NewClientMetrics()}
|
||||
err := ci.intercept(context.Background(), "-service-test", nil, nil, nil, testInvoker)
|
||||
test.AssertNotError(t, err, "ci.intercept failed with a non-nil grpc.UnaryServerInfo")
|
||||
|
||||
|
|
@ -69,7 +70,7 @@ func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_prot
|
|||
// timeout is reached, i.e. that FailFast is set to false.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
|
||||
func TestFailFastFalse(t *testing.T) {
|
||||
ci := &clientInterceptor{100 * time.Millisecond}
|
||||
ci := &clientInterceptor{100 * time.Millisecond, grpc_prometheus.NewClientMetrics()}
|
||||
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBalancer(grpc.RoundRobin(newStaticResolver([]string{"localhost:19000"}))),
|
||||
|
|
|
|||
|
|
@ -10,22 +10,21 @@ import (
|
|||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
bcreds "github.com/letsencrypt/boulder/grpc/creds"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
)
|
||||
|
||||
// CodedError is a alias required to appease go vet
|
||||
var CodedError = grpc.Errorf
|
||||
|
||||
var errNilScope = errors.New("boulder/grpc: received nil scope")
|
||||
var errNilMetrics = errors.New("boulder/grpc: received nil ServerMetrics")
|
||||
var errNilTLS = errors.New("boulder/grpc: received nil tls.Config")
|
||||
|
||||
// NewServer creates a gRPC server that uses the provided *tls.Config, and
|
||||
// verifies that clients present a certificate that (a) is signed by one of
|
||||
// the configured ClientCAs, and (b) contains at least one
|
||||
// subjectAlternativeName matching the accepted list from GRPCServerConfig.
|
||||
func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, stats metrics.Scope) (*grpc.Server, net.Listener, error) {
|
||||
if stats == nil {
|
||||
return nil, nil, errNilScope
|
||||
func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, serverMetrics *grpc_prometheus.ServerMetrics) (*grpc.Server, net.Listener, error) {
|
||||
if serverMetrics == nil {
|
||||
return nil, nil, errNilMetrics
|
||||
}
|
||||
if tls == nil {
|
||||
return nil, nil, errNilTLS
|
||||
|
|
@ -45,8 +44,16 @@ func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, stats metrics.Scope) (*
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
|
||||
si := &serverInterceptor{}
|
||||
si := &serverInterceptor{serverMetrics}
|
||||
return grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(si.intercept)), l, nil
|
||||
}
|
||||
|
||||
// NewServerMetrics constructs a *grpc_prometheus.ServerMetrics, registered with
|
||||
// the given registry, with timing histogram enabled. It must be called a
|
||||
// maximum of once per registry, or there will be conflicting names.
|
||||
func NewServerMetrics(stats registry) *grpc_prometheus.ServerMetrics {
|
||||
metrics := grpc_prometheus.NewServerMetrics()
|
||||
metrics.EnableHandlingTimeHistogram()
|
||||
stats.MustRegister(metrics)
|
||||
return metrics
|
||||
}
|
||||
|
|
|
|||
|
|
@ -426,6 +426,9 @@ def test_stats():
|
|||
raise Exception("%s not present in %s" % (stat, url))
|
||||
expect_stat(8000, "\nresponse_time_count{")
|
||||
expect_stat(8000, "\ngo_goroutines ")
|
||||
expect_stat(8000, '\ngrpc_client_handling_seconds_count{grpc_method="NewRegistration",grpc_service="ra.RegistrationAuthority",grpc_type="unary"} ')
|
||||
expect_stat(8002, '\ngrpc_server_handling_seconds_sum{grpc_method="UpdateAuthorization",grpc_service="ra.RegistrationAuthority",grpc_type="unary"} ')
|
||||
expect_stat(8002, '\ngrpc_client_handling_seconds_count{grpc_method="UpdatePendingAuthorization",grpc_service="sa.StorageAuthority",grpc_type="unary"} ')
|
||||
expect_stat(8001, "\ngo_goroutines ")
|
||||
|
||||
exit_status = 1
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
# Created by .ignore support plugin (hsz.mobi)
|
||||
coverage.txt
|
||||
### Go template
|
||||
# Compiled Object files, Static and Dynamic libs (Shared Objects)
|
||||
*.o
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
sudo: false
|
||||
language: go
|
||||
go:
|
||||
- 1.6
|
||||
- 1.7
|
||||
- 1.6.x
|
||||
- 1.7.x
|
||||
- 1.8.x
|
||||
- master
|
||||
|
||||
install:
|
||||
- go get github.com/prometheus/client_golang/prometheus
|
||||
|
|
@ -10,4 +13,7 @@ install:
|
|||
- go get github.com/stretchr/testify
|
||||
|
||||
script:
|
||||
- go test -race -v ./...
|
||||
- ./test_all.sh
|
||||
|
||||
after_success:
|
||||
- bash <(curl -s https://codecov.io/bash)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@
|
|||
[](https://travis-ci.org/grpc-ecosystem/go-grpc-prometheus)
|
||||
[](http://goreportcard.com/report/grpc-ecosystem/go-grpc-prometheus)
|
||||
[](https://godoc.org/github.com/grpc-ecosystem/go-grpc-prometheus)
|
||||
[](https://sourcegraph.com/github.com/grpc-ecosystem/go-grpc-prometheus/?badge)
|
||||
[](https://codecov.io/gh/grpc-ecosystem/go-grpc-prometheus)
|
||||
[](LICENSE)
|
||||
|
||||
[Prometheus](https://prometheus.io/) monitoring for your [gRPC Go](https://github.com/grpc/grpc-go) servers and clients.
|
||||
|
|
@ -36,7 +38,7 @@ import "github.com/grpc-ecosystem/go-grpc-prometheus"
|
|||
// After all your registrations, make sure all of the Prometheus metrics are initialized.
|
||||
grpc_prometheus.Register(myServer)
|
||||
// Register Prometheus metrics handler.
|
||||
http.Handle("/metrics", prometheus.Handler())
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
...
|
||||
```
|
||||
|
||||
|
|
|
|||
|
|
@ -6,67 +6,34 @@
|
|||
package grpc_prometheus
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
monitor := newClientReporter(Unary, method)
|
||||
monitor.SentMessage()
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
if err != nil {
|
||||
monitor.ReceivedMessage()
|
||||
}
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return err
|
||||
var (
|
||||
// DefaultClientMetrics is the default instance of ClientMetrics. It is
|
||||
// intended to be used in conjunction the default Prometheus metrics
|
||||
// registry.
|
||||
DefaultClientMetrics = NewClientMetrics()
|
||||
|
||||
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
UnaryClientInterceptor = DefaultClientMetrics.UnaryClientInterceptor()
|
||||
|
||||
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
|
||||
)
|
||||
|
||||
func init() {
|
||||
prom.MustRegister(DefaultClientMetrics.clientStartedCounter)
|
||||
prom.MustRegister(DefaultClientMetrics.clientHandledCounter)
|
||||
prom.MustRegister(DefaultClientMetrics.clientStreamMsgReceived)
|
||||
prom.MustRegister(DefaultClientMetrics.clientStreamMsgSent)
|
||||
}
|
||||
|
||||
// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
monitor := newClientReporter(clientStreamType(desc), method)
|
||||
clientStream, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return nil, err
|
||||
}
|
||||
return &monitoredClientStream{clientStream, monitor}, nil
|
||||
}
|
||||
|
||||
func clientStreamType(desc *grpc.StreamDesc) grpcType {
|
||||
if desc.ClientStreams && !desc.ServerStreams {
|
||||
return ClientStream
|
||||
} else if !desc.ClientStreams && desc.ServerStreams {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
||||
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
|
||||
type monitoredClientStream struct {
|
||||
grpc.ClientStream
|
||||
monitor *clientReporter
|
||||
}
|
||||
|
||||
func (s *monitoredClientStream) SendMsg(m interface{}) error {
|
||||
err := s.ClientStream.SendMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.SentMessage()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
|
||||
err := s.ClientStream.RecvMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.ReceivedMessage()
|
||||
} else if err == io.EOF {
|
||||
s.monitor.Handled(codes.OK)
|
||||
} else {
|
||||
s.monitor.Handled(grpc.Code(err))
|
||||
}
|
||||
return err
|
||||
// EnableClientHandlingTimeHistogram turns on recording of handling time of
|
||||
// RPCs. Histogram metrics can be very expensive for Prometheus to retain and
|
||||
// query. This function acts on the DefaultClientMetrics variable and the
|
||||
// default Prometheus metrics registry.
|
||||
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
DefaultClientMetrics.EnableClientHandlingTimeHistogram(opts...)
|
||||
prom.Register(DefaultClientMetrics.clientHandledHistogram)
|
||||
}
|
||||
|
|
|
|||
165
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/client_metrics.go
generated
vendored
Normal file
165
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/client_metrics.go
generated
vendored
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
package grpc_prometheus
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
// ClientMetrics represents a collection of metrics to be registered on a
|
||||
// Prometheus metrics registry for a gRPC client.
|
||||
type ClientMetrics struct {
|
||||
clientStartedCounter *prom.CounterVec
|
||||
clientHandledCounter *prom.CounterVec
|
||||
clientStreamMsgReceived *prom.CounterVec
|
||||
clientStreamMsgSent *prom.CounterVec
|
||||
clientHandledHistogramEnabled bool
|
||||
clientHandledHistogramOpts prom.HistogramOpts
|
||||
clientHandledHistogram *prom.HistogramVec
|
||||
}
|
||||
|
||||
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
|
||||
// ClientMetrics when not using the default Prometheus metrics registry, for
|
||||
// example when wanting to control which metrics are added to a registry as
|
||||
// opposed to automatically adding metrics via init functions.
|
||||
func NewClientMetrics() *ClientMetrics {
|
||||
return &ClientMetrics{
|
||||
clientStartedCounter: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_client_started_total",
|
||||
Help: "Total number of RPCs started on the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
|
||||
clientHandledCounter: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_client_handled_total",
|
||||
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
|
||||
|
||||
clientStreamMsgReceived: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_client_msg_received_total",
|
||||
Help: "Total number of RPC stream messages received by the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
|
||||
clientStreamMsgSent: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_client_msg_sent_total",
|
||||
Help: "Total number of gRPC stream messages sent by the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
|
||||
clientHandledHistogramEnabled: false,
|
||||
clientHandledHistogramOpts: prom.HistogramOpts{
|
||||
Name: "grpc_client_handling_seconds",
|
||||
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
|
||||
Buckets: prom.DefBuckets,
|
||||
},
|
||||
clientHandledHistogram: nil,
|
||||
}
|
||||
}
|
||||
|
||||
// Describe sends the super-set of all possible descriptors of metrics
|
||||
// collected by this Collector to the provided channel and returns once
|
||||
// the last descriptor has been sent.
|
||||
func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
|
||||
m.clientStartedCounter.Describe(ch)
|
||||
m.clientHandledCounter.Describe(ch)
|
||||
m.clientStreamMsgReceived.Describe(ch)
|
||||
m.clientStreamMsgSent.Describe(ch)
|
||||
if m.clientHandledHistogramEnabled {
|
||||
m.clientHandledHistogram.Describe(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// Collect is called by the Prometheus registry when collecting
|
||||
// metrics. The implementation sends each collected metric via the
|
||||
// provided channel and returns once the last metric has been sent.
|
||||
func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
|
||||
m.clientStartedCounter.Collect(ch)
|
||||
m.clientHandledCounter.Collect(ch)
|
||||
m.clientStreamMsgReceived.Collect(ch)
|
||||
m.clientStreamMsgSent.Collect(ch)
|
||||
if m.clientHandledHistogramEnabled {
|
||||
m.clientHandledHistogram.Collect(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
|
||||
// Histogram metrics can be very expensive for Prometheus to retain and query.
|
||||
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
for _, o := range opts {
|
||||
o(&m.clientHandledHistogramOpts)
|
||||
}
|
||||
if !m.clientHandledHistogramEnabled {
|
||||
m.clientHandledHistogram = prom.NewHistogramVec(
|
||||
m.clientHandledHistogramOpts,
|
||||
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
||||
)
|
||||
}
|
||||
m.clientHandledHistogramEnabled = true
|
||||
}
|
||||
|
||||
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
monitor := newClientReporter(m, Unary, method)
|
||||
monitor.SentMessage()
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
if err != nil {
|
||||
monitor.ReceivedMessage()
|
||||
}
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// StreamServerInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
monitor := newClientReporter(m, clientStreamType(desc), method)
|
||||
clientStream, err := streamer(ctx, desc, cc, method, opts...)
|
||||
if err != nil {
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return nil, err
|
||||
}
|
||||
return &monitoredClientStream{clientStream, monitor}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func clientStreamType(desc *grpc.StreamDesc) grpcType {
|
||||
if desc.ClientStreams && !desc.ServerStreams {
|
||||
return ClientStream
|
||||
} else if !desc.ClientStreams && desc.ServerStreams {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
||||
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
|
||||
type monitoredClientStream struct {
|
||||
grpc.ClientStream
|
||||
monitor *clientReporter
|
||||
}
|
||||
|
||||
func (s *monitoredClientStream) SendMsg(m interface{}) error {
|
||||
err := s.ClientStream.SendMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.SentMessage()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
|
||||
err := s.ClientStream.RecvMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.ReceivedMessage()
|
||||
} else if err == io.EOF {
|
||||
s.monitor.Handled(codes.OK)
|
||||
} else {
|
||||
s.monitor.Handled(grpc.Code(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
@ -7,105 +7,40 @@ import (
|
|||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var (
|
||||
clientStartedCounter = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "client",
|
||||
Name: "started_total",
|
||||
Help: "Total number of RPCs started on the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
clientHandledCounter = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "client",
|
||||
Name: "handled_total",
|
||||
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})
|
||||
|
||||
clientStreamMsgReceived = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "client",
|
||||
Name: "msg_received_total",
|
||||
Help: "Total number of RPC stream messages received by the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
clientStreamMsgSent = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "client",
|
||||
Name: "msg_sent_total",
|
||||
Help: "Total number of gRPC stream messages sent by the client.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
clientHandledHistogramEnabled = false
|
||||
clientHandledHistogramOpts = prom.HistogramOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "client",
|
||||
Name: "handling_seconds",
|
||||
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
|
||||
Buckets: prom.DefBuckets,
|
||||
}
|
||||
clientHandledHistogram *prom.HistogramVec
|
||||
)
|
||||
|
||||
func init() {
|
||||
prom.MustRegister(clientStartedCounter)
|
||||
prom.MustRegister(clientHandledCounter)
|
||||
prom.MustRegister(clientStreamMsgReceived)
|
||||
prom.MustRegister(clientStreamMsgSent)
|
||||
}
|
||||
|
||||
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
|
||||
// Histogram metrics can be very expensive for Prometheus to retain and query.
|
||||
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
for _, o := range opts {
|
||||
o(&clientHandledHistogramOpts)
|
||||
}
|
||||
if !clientHandledHistogramEnabled {
|
||||
clientHandledHistogram = prom.NewHistogramVec(
|
||||
clientHandledHistogramOpts,
|
||||
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
||||
)
|
||||
prom.Register(clientHandledHistogram)
|
||||
}
|
||||
clientHandledHistogramEnabled = true
|
||||
}
|
||||
|
||||
type clientReporter struct {
|
||||
metrics *ClientMetrics
|
||||
rpcType grpcType
|
||||
serviceName string
|
||||
methodName string
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
func newClientReporter(rpcType grpcType, fullMethod string) *clientReporter {
|
||||
r := &clientReporter{rpcType: rpcType}
|
||||
if clientHandledHistogramEnabled {
|
||||
func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *clientReporter {
|
||||
r := &clientReporter{
|
||||
metrics: m,
|
||||
rpcType: rpcType,
|
||||
}
|
||||
if r.metrics.clientHandledHistogramEnabled {
|
||||
r.startTime = time.Now()
|
||||
}
|
||||
r.serviceName, r.methodName = splitMethodName(fullMethod)
|
||||
clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *clientReporter) ReceivedMessage() {
|
||||
clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
}
|
||||
|
||||
func (r *clientReporter) SentMessage() {
|
||||
clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
}
|
||||
|
||||
func (r *clientReporter) Handled(code codes.Code) {
|
||||
clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
|
||||
if clientHandledHistogramEnabled {
|
||||
clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
|
||||
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
|
||||
if r.metrics.clientHandledHistogramEnabled {
|
||||
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,69 +6,43 @@
|
|||
package grpc_prometheus
|
||||
|
||||
import (
|
||||
"golang.org/x/net/context"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// PreregisterServices takes a gRPC server and pre-initializes all counters to 0.
|
||||
// This allows for easier monitoring in Prometheus (no missing metrics), and should be called *after* all services have
|
||||
// been registered with the server.
|
||||
var (
|
||||
// DefaultServerMetrics is the default instance of ServerMetrics. It is
|
||||
// intended to be used in conjunction the default Prometheus metrics
|
||||
// registry.
|
||||
DefaultServerMetrics = NewServerMetrics()
|
||||
|
||||
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
UnaryServerInterceptor = DefaultServerMetrics.UnaryServerInterceptor()
|
||||
|
||||
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
StreamServerInterceptor = DefaultServerMetrics.StreamServerInterceptor()
|
||||
)
|
||||
|
||||
func init() {
|
||||
prom.MustRegister(DefaultServerMetrics.serverStartedCounter)
|
||||
prom.MustRegister(DefaultServerMetrics.serverHandledCounter)
|
||||
prom.MustRegister(DefaultServerMetrics.serverStreamMsgReceived)
|
||||
prom.MustRegister(DefaultServerMetrics.serverStreamMsgSent)
|
||||
}
|
||||
|
||||
// Register takes a gRPC server and pre-initializes all counters to 0. This
|
||||
// allows for easier monitoring in Prometheus (no missing metrics), and should
|
||||
// be called *after* all services have been registered with the server. This
|
||||
// function acts on the DefaultServerMetrics variable.
|
||||
func Register(server *grpc.Server) {
|
||||
serviceInfo := server.GetServiceInfo()
|
||||
for serviceName, info := range serviceInfo {
|
||||
for _, mInfo := range info.Methods {
|
||||
preRegisterMethod(serviceName, &mInfo)
|
||||
}
|
||||
}
|
||||
DefaultServerMetrics.InitializeMetrics(server)
|
||||
}
|
||||
|
||||
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
monitor := newServerReporter(Unary, info.FullMethod)
|
||||
monitor.ReceivedMessage()
|
||||
resp, err := handler(ctx, req)
|
||||
monitor.Handled(grpc.Code(err))
|
||||
if err == nil {
|
||||
monitor.SentMessage()
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
monitor := newServerReporter(streamRpcType(info), info.FullMethod)
|
||||
err := handler(srv, &monitoredServerStream{ss, monitor})
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return err
|
||||
}
|
||||
|
||||
func streamRpcType(info *grpc.StreamServerInfo) grpcType {
|
||||
if info.IsClientStream && !info.IsServerStream {
|
||||
return ClientStream
|
||||
} else if !info.IsClientStream && info.IsServerStream {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
||||
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
|
||||
type monitoredServerStream struct {
|
||||
grpc.ServerStream
|
||||
monitor *serverReporter
|
||||
}
|
||||
|
||||
func (s *monitoredServerStream) SendMsg(m interface{}) error {
|
||||
err := s.ServerStream.SendMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.SentMessage()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
|
||||
err := s.ServerStream.RecvMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.ReceivedMessage()
|
||||
}
|
||||
return err
|
||||
// EnableHandlingTimeHistogram turns on recording of handling time
|
||||
// of RPCs. Histogram metrics can be very expensive for Prometheus
|
||||
// to retain and query. This function acts on the DefaultServerMetrics
|
||||
// variable and the default Prometheus metrics registry.
|
||||
func EnableHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
DefaultServerMetrics.EnableHandlingTimeHistogram(opts...)
|
||||
prom.Register(DefaultServerMetrics.serverHandledHistogram)
|
||||
}
|
||||
|
|
|
|||
208
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/server_metrics.go
generated
vendored
Normal file
208
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/server_metrics.go
generated
vendored
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
package grpc_prometheus
|
||||
|
||||
import (
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// ServerMetrics represents a collection of metrics to be registered on a
|
||||
// Prometheus metrics registry for a gRPC server.
|
||||
type ServerMetrics struct {
|
||||
serverStartedCounter *prom.CounterVec
|
||||
serverHandledCounter *prom.CounterVec
|
||||
serverStreamMsgReceived *prom.CounterVec
|
||||
serverStreamMsgSent *prom.CounterVec
|
||||
serverHandledHistogramEnabled bool
|
||||
serverHandledHistogramOpts prom.HistogramOpts
|
||||
serverHandledHistogram *prom.HistogramVec
|
||||
}
|
||||
|
||||
// NewServerMetrics returns a ServerMetrics object. Use a new instance of
|
||||
// ServerMetrics when not using the default Prometheus metrics registry, for
|
||||
// example when wanting to control which metrics are added to a registry as
|
||||
// opposed to automatically adding metrics via init functions.
|
||||
func NewServerMetrics() *ServerMetrics {
|
||||
return &ServerMetrics{
|
||||
serverStartedCounter: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_server_started_total",
|
||||
Help: "Total number of RPCs started on the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
serverHandledCounter: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_server_handled_total",
|
||||
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
|
||||
serverStreamMsgReceived: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_server_msg_received_total",
|
||||
Help: "Total number of RPC stream messages received on the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
serverStreamMsgSent: prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Name: "grpc_server_msg_sent_total",
|
||||
Help: "Total number of gRPC stream messages sent by the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"}),
|
||||
serverHandledHistogramEnabled: false,
|
||||
serverHandledHistogramOpts: prom.HistogramOpts{
|
||||
Name: "grpc_server_handling_seconds",
|
||||
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
|
||||
Buckets: prom.DefBuckets,
|
||||
},
|
||||
serverHandledHistogram: nil,
|
||||
}
|
||||
}
|
||||
|
||||
type HistogramOption func(*prom.HistogramOpts)
|
||||
|
||||
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
|
||||
func WithHistogramBuckets(buckets []float64) HistogramOption {
|
||||
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
|
||||
}
|
||||
|
||||
// EnableHandlingTimeHistogram enables histograms being registered when
|
||||
// registering the ServerMetrics on a Prometheus registry. Histograms can be
|
||||
// expensive on Prometheus servers. It takes options to configure histogram
|
||||
// options such as the defined buckets.
|
||||
func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
for _, o := range opts {
|
||||
o(&m.serverHandledHistogramOpts)
|
||||
}
|
||||
if !m.serverHandledHistogramEnabled {
|
||||
m.serverHandledHistogram = prom.NewHistogramVec(
|
||||
m.serverHandledHistogramOpts,
|
||||
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
||||
)
|
||||
}
|
||||
m.serverHandledHistogramEnabled = true
|
||||
}
|
||||
|
||||
// Describe sends the super-set of all possible descriptors of metrics
|
||||
// collected by this Collector to the provided channel and returns once
|
||||
// the last descriptor has been sent.
|
||||
func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
|
||||
m.serverStartedCounter.Describe(ch)
|
||||
m.serverHandledCounter.Describe(ch)
|
||||
m.serverStreamMsgReceived.Describe(ch)
|
||||
m.serverStreamMsgSent.Describe(ch)
|
||||
if m.serverHandledHistogramEnabled {
|
||||
m.serverHandledHistogram.Describe(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// Collect is called by the Prometheus registry when collecting
|
||||
// metrics. The implementation sends each collected metric via the
|
||||
// provided channel and returns once the last metric has been sent.
|
||||
func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
|
||||
m.serverStartedCounter.Collect(ch)
|
||||
m.serverHandledCounter.Collect(ch)
|
||||
m.serverStreamMsgReceived.Collect(ch)
|
||||
m.serverStreamMsgSent.Collect(ch)
|
||||
if m.serverHandledHistogramEnabled {
|
||||
m.serverHandledHistogram.Collect(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
|
||||
func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
monitor := newServerReporter(m, Unary, info.FullMethod)
|
||||
monitor.ReceivedMessage()
|
||||
resp, err := handler(ctx, req)
|
||||
monitor.Handled(grpc.Code(err))
|
||||
if err == nil {
|
||||
monitor.SentMessage()
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
}
|
||||
|
||||
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
|
||||
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
monitor := newServerReporter(m, streamRpcType(info), info.FullMethod)
|
||||
err := handler(srv, &monitoredServerStream{ss, monitor})
|
||||
monitor.Handled(grpc.Code(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// InitializeMetrics initializes all metrics, with their appropriate null
|
||||
// value, for all gRPC methods registered on a gRPC server. This is useful, to
|
||||
// ensure that all metrics exist when collecting and querying.
|
||||
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
|
||||
serviceInfo := server.GetServiceInfo()
|
||||
for serviceName, info := range serviceInfo {
|
||||
for _, mInfo := range info.Methods {
|
||||
preRegisterMethod(m, serviceName, &mInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register registers all server metrics in a given metrics registry. Depending
|
||||
// on histogram options and whether they are enabled, histogram metrics are
|
||||
// also registered.
|
||||
//
|
||||
// Deprecated: ServerMetrics implements Prometheus Collector interface. You can
|
||||
// register an instance of ServerMetrics directly by using
|
||||
// prometheus.Register(m).
|
||||
func (m *ServerMetrics) Register(r prom.Registerer) error {
|
||||
return r.Register(m)
|
||||
}
|
||||
|
||||
// MustRegister tries to register all server metrics and panics on an error.
|
||||
//
|
||||
// Deprecated: ServerMetrics implements Prometheus Collector interface. You can
|
||||
// register an instance of ServerMetrics directly by using
|
||||
// prometheus.MustRegister(m).
|
||||
func (m *ServerMetrics) MustRegister(r prom.Registerer) {
|
||||
r.MustRegister(m)
|
||||
}
|
||||
|
||||
func streamRpcType(info *grpc.StreamServerInfo) grpcType {
|
||||
if info.IsClientStream && !info.IsServerStream {
|
||||
return ClientStream
|
||||
} else if !info.IsClientStream && info.IsServerStream {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
||||
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
|
||||
type monitoredServerStream struct {
|
||||
grpc.ServerStream
|
||||
monitor *serverReporter
|
||||
}
|
||||
|
||||
func (s *monitoredServerStream) SendMsg(m interface{}) error {
|
||||
err := s.ServerStream.SendMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.SentMessage()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
|
||||
err := s.ServerStream.RecvMsg(m)
|
||||
if err == nil {
|
||||
s.monitor.ReceivedMessage()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
|
||||
func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
|
||||
methodName := mInfo.Name
|
||||
methodType := string(typeFromMethodInfo(mInfo))
|
||||
// These are just references (no increments), as just referencing will create the labels but not set values.
|
||||
metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
if metrics.serverHandledHistogramEnabled {
|
||||
metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
}
|
||||
for _, code := range allCodes {
|
||||
metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
|
||||
}
|
||||
}
|
||||
|
|
@ -7,151 +7,40 @@ import (
|
|||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type grpcType string
|
||||
|
||||
const (
|
||||
Unary grpcType = "unary"
|
||||
ClientStream grpcType = "client_stream"
|
||||
ServerStream grpcType = "server_stream"
|
||||
BidiStream grpcType = "bidi_stream"
|
||||
)
|
||||
|
||||
var (
|
||||
serverStartedCounter = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "server",
|
||||
Name: "started_total",
|
||||
Help: "Total number of RPCs started on the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
serverHandledCounter = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "server",
|
||||
Name: "handled_total",
|
||||
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"})
|
||||
|
||||
serverStreamMsgReceived = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "server",
|
||||
Name: "msg_received_total",
|
||||
Help: "Total number of RPC stream messages received on the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
serverStreamMsgSent = prom.NewCounterVec(
|
||||
prom.CounterOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "server",
|
||||
Name: "msg_sent_total",
|
||||
Help: "Total number of gRPC stream messages sent by the server.",
|
||||
}, []string{"grpc_type", "grpc_service", "grpc_method"})
|
||||
|
||||
serverHandledHistogramEnabled = false
|
||||
serverHandledHistogramOpts = prom.HistogramOpts{
|
||||
Namespace: "grpc",
|
||||
Subsystem: "server",
|
||||
Name: "handling_seconds",
|
||||
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
|
||||
Buckets: prom.DefBuckets,
|
||||
}
|
||||
serverHandledHistogram *prom.HistogramVec
|
||||
)
|
||||
|
||||
func init() {
|
||||
prom.MustRegister(serverStartedCounter)
|
||||
prom.MustRegister(serverHandledCounter)
|
||||
prom.MustRegister(serverStreamMsgReceived)
|
||||
prom.MustRegister(serverStreamMsgSent)
|
||||
}
|
||||
|
||||
type HistogramOption func(*prom.HistogramOpts)
|
||||
|
||||
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
|
||||
func WithHistogramBuckets(buckets []float64) HistogramOption {
|
||||
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
|
||||
}
|
||||
|
||||
// EnableHandlingTimeHistogram turns on recording of handling time of RPCs for server-side interceptors.
|
||||
// Histogram metrics can be very expensive for Prometheus to retain and query.
|
||||
func EnableHandlingTimeHistogram(opts ...HistogramOption) {
|
||||
for _, o := range opts {
|
||||
o(&serverHandledHistogramOpts)
|
||||
}
|
||||
if !serverHandledHistogramEnabled {
|
||||
serverHandledHistogram = prom.NewHistogramVec(
|
||||
serverHandledHistogramOpts,
|
||||
[]string{"grpc_type", "grpc_service", "grpc_method"},
|
||||
)
|
||||
prom.Register(serverHandledHistogram)
|
||||
}
|
||||
serverHandledHistogramEnabled = true
|
||||
}
|
||||
|
||||
type serverReporter struct {
|
||||
metrics *ServerMetrics
|
||||
rpcType grpcType
|
||||
serviceName string
|
||||
methodName string
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
func newServerReporter(rpcType grpcType, fullMethod string) *serverReporter {
|
||||
r := &serverReporter{rpcType: rpcType}
|
||||
if serverHandledHistogramEnabled {
|
||||
func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *serverReporter {
|
||||
r := &serverReporter{
|
||||
metrics: m,
|
||||
rpcType: rpcType,
|
||||
}
|
||||
if r.metrics.serverHandledHistogramEnabled {
|
||||
r.startTime = time.Now()
|
||||
}
|
||||
r.serviceName, r.methodName = splitMethodName(fullMethod)
|
||||
serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *serverReporter) ReceivedMessage() {
|
||||
serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
}
|
||||
|
||||
func (r *serverReporter) SentMessage() {
|
||||
serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
|
||||
}
|
||||
|
||||
func (r *serverReporter) Handled(code codes.Code) {
|
||||
serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
|
||||
if serverHandledHistogramEnabled {
|
||||
serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
|
||||
r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
|
||||
if r.metrics.serverHandledHistogramEnabled {
|
||||
r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
|
||||
func preRegisterMethod(serviceName string, mInfo *grpc.MethodInfo) {
|
||||
methodName := mInfo.Name
|
||||
methodType := string(typeFromMethodInfo(mInfo))
|
||||
// These are just references (no increments), as just referencing will create the labels but not set values.
|
||||
serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
if serverHandledHistogramEnabled {
|
||||
serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
|
||||
}
|
||||
for _, code := range allCodes {
|
||||
serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
|
||||
}
|
||||
}
|
||||
|
||||
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
|
||||
if mInfo.IsClientStream == false && mInfo.IsServerStream == false {
|
||||
return Unary
|
||||
}
|
||||
if mInfo.IsClientStream == true && mInfo.IsServerStream == false {
|
||||
return ClientStream
|
||||
}
|
||||
if mInfo.IsClientStream == false && mInfo.IsServerStream == true {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
|
|
|||
14
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/test_all.sh
generated
vendored
Executable file
14
vendor/github.com/grpc-ecosystem/go-grpc-prometheus/test_all.sh
generated
vendored
Executable file
|
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
echo "" > coverage.txt
|
||||
|
||||
for d in $(go list ./... | grep -v vendor); do
|
||||
echo -e "TESTS FOR: for \033[0;35m${d}\033[0m"
|
||||
go test -race -v -coverprofile=profile.coverage.out -covermode=atomic $d
|
||||
if [ -f profile.coverage.out ]; then
|
||||
cat profile.coverage.out >> coverage.txt
|
||||
rm profile.coverage.out
|
||||
fi
|
||||
echo ""
|
||||
done
|
||||
|
|
@ -6,9 +6,19 @@ package grpc_prometheus
|
|||
import (
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
type grpcType string
|
||||
|
||||
const (
|
||||
Unary grpcType = "unary"
|
||||
ClientStream grpcType = "client_stream"
|
||||
ServerStream grpcType = "server_stream"
|
||||
BidiStream grpcType = "bidi_stream"
|
||||
)
|
||||
|
||||
var (
|
||||
allCodes = []codes.Code{
|
||||
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
|
||||
|
|
@ -25,3 +35,16 @@ func splitMethodName(fullMethodName string) (string, string) {
|
|||
}
|
||||
return "unknown", "unknown"
|
||||
}
|
||||
|
||||
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
|
||||
if mInfo.IsClientStream == false && mInfo.IsServerStream == false {
|
||||
return Unary
|
||||
}
|
||||
if mInfo.IsClientStream == true && mInfo.IsServerStream == false {
|
||||
return ClientStream
|
||||
}
|
||||
if mInfo.IsClientStream == false && mInfo.IsServerStream == true {
|
||||
return ServerStream
|
||||
}
|
||||
return BidiStream
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue