Check if metrics enabled for HTTP middleware and gRPC interceptors (#1538)

Co-authored-by: Young Bu Park <youngp@microsoft.com>
This commit is contained in:
Sashi Kumar 2020-05-11 17:01:53 +02:00 committed by GitHub
parent 8864ab7adc
commit 5b01b2a2a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 52 additions and 22 deletions

View File

@ -552,9 +552,12 @@ func (a *actorsRuntime) getPlacementClientPersistently(placementAddress, hostAdd
log.Errorf("failed to establish TLS credentials for actor placement service: %s", err)
return nil
}
opts = append(
opts,
grpc.WithUnaryInterceptor(diag.DefaultGRPCMonitoring.UnaryClientInterceptor()))
if diag.DefaultGRPCMonitoring.IsEnabled() {
opts = append(
opts,
grpc.WithUnaryInterceptor(diag.DefaultGRPCMonitoring.UnaryClientInterceptor()))
}
conn, err := grpc.Dial(
placementAddress,

View File

@ -135,6 +135,10 @@ func (g *grpcMetrics) Init(appID string) error {
return view.Register(views...)
}
func (g *grpcMetrics) IsEnabled() bool {
return g.enabled
}
func (g *grpcMetrics) ServerRequestReceived(ctx context.Context, method string, contentSize int64) time.Time {
if g.enabled {
stats.RecordWithTags(

View File

@ -83,6 +83,10 @@ func newHTTPMetrics() *httpMetrics {
}
}
func (h *httpMetrics) IsEnabled() bool {
return h.enabled
}
func (h *httpMetrics) ServerRequestReceived(ctx context.Context, method, path string, contentSize int64) {
if h.enabled {
stats.RecordWithTags(

View File

@ -23,9 +23,9 @@ var (
// DefaultMonitoring holds service monitoring metrics definitions
DefaultMonitoring = newServiceMetrics()
// DefaultGRPCMonitoring holds default gRPC monitoring handlers and middleswares
// DefaultGRPCMonitoring holds default gRPC monitoring handlers and middlewares
DefaultGRPCMonitoring = newGRPCMetrics()
// DefaultHTTPMonitoring holds default HTTP monitoring handlers and middleswares
// DefaultHTTPMonitoring holds default HTTP monitoring handlers and middlewares
DefaultHTTPMonitoring = newHTTPMetrics()
)

View File

@ -74,10 +74,13 @@ func (g *Manager) GetGRPCConnection(address, id string, skipTLS, recreateIfExist
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithUnaryInterceptor(diag.DefaultGRPCMonitoring.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(grpcServiceConfig),
}
if diag.DefaultGRPCMonitoring.IsEnabled() {
opts = append(opts, grpc.WithUnaryInterceptor(diag.DefaultGRPCMonitoring.UnaryClientInterceptor()))
}
if !skipTLS && g.auth != nil {
signedCert := g.auth.GetCurrentSignedCert()
cert, err := tls.X509KeyPair(signedCert.WorkloadCert, signedCert.PrivateKeyPem)

View File

@ -137,14 +137,19 @@ func (s *server) getMiddlewareOptions() []grpc_go.ServerOption {
opts := []grpc_go.ServerOption{}
s.logger.Infof("enabled monitoring middleware.")
unaryChains := grpc_middleware.ChainUnaryServer(
diag.SetTracingSpanContextGRPCMiddlewareUnary(s.tracingSpec),
diag.DefaultGRPCMonitoring.UnaryServerInterceptor(),
)
unaryServerInterceptor := diag.SetTracingSpanContextGRPCMiddlewareUnary(s.tracingSpec)
if diag.DefaultGRPCMonitoring.IsEnabled() {
unaryServerInterceptor = grpc_middleware.ChainUnaryServer(
unaryServerInterceptor,
diag.DefaultGRPCMonitoring.UnaryServerInterceptor(),
)
}
opts = append(
opts,
grpc_go.StreamInterceptor(diag.SetTracingSpanContextGRPCMiddlewareStream(s.tracingSpec)),
grpc_go.UnaryInterceptor(unaryChains))
grpc_go.UnaryInterceptor(unaryServerInterceptor))
return opts
}

View File

@ -73,7 +73,10 @@ func (s *server) useTracing(next fasthttp.RequestHandler) fasthttp.RequestHandle
}
func (s *server) useMetrics(next fasthttp.RequestHandler) fasthttp.RequestHandler {
return diag.DefaultHTTPMonitoring.FastHTTPMiddleware(next)
if diag.DefaultHTTPMonitoring.IsEnabled() {
return diag.DefaultHTTPMonitoring.FastHTTPMiddleware(next)
}
return next
}
func (s *server) useRouter() fasthttp.RequestHandler {

View File

@ -17,12 +17,16 @@ import (
// GetOperatorClient returns a new k8s operator client and the underlying connection.
// If a cert chain is given, a TLS connection will be established.
func GetOperatorClient(address, serverName string, certChain *dapr_credentials.CertChain) (operatorv1pb.OperatorClient, *grpc.ClientConn, error) {
unaryChain := grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
diag.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
unaryClientInterceptor := grpc_retry.UnaryClientInterceptor()
opts := []grpc.DialOption{grpc.WithUnaryInterceptor(unaryChain)}
if diag.DefaultGRPCMonitoring.IsEnabled() {
unaryClientInterceptor = grpc_middleware.ChainUnaryClient(
unaryClientInterceptor,
diag.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
}
opts := []grpc.DialOption{grpc.WithUnaryInterceptor(unaryClientInterceptor)}
if certChain != nil {
cp := x509.NewCertPool()

View File

@ -88,15 +88,19 @@ func (a *authenticator) CreateSignedWorkloadCert(id string) (*SignedCertificate,
return nil, fmt.Errorf("failed to create tls config from cert and key: %s", err)
}
unaryChain := grpc_middleware.ChainUnaryClient(
grpc_retry.UnaryClientInterceptor(),
diag.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
unaryClientInterceptor := grpc_retry.UnaryClientInterceptor()
if diag.DefaultGRPCMonitoring.IsEnabled() {
unaryClientInterceptor = grpc_middleware.ChainUnaryClient(
unaryClientInterceptor,
diag.DefaultGRPCMonitoring.UnaryClientInterceptor(),
)
}
conn, err := grpc.Dial(
a.sentryAddress,
grpc.WithTransportCredentials(credentials.NewTLS(config)),
grpc.WithUnaryInterceptor(unaryChain))
grpc.WithUnaryInterceptor(unaryClientInterceptor))
if err != nil {
diag.DefaultMonitoring.MTLSWorkLoadCertRotationFailed("sentry_conn")
return nil, fmt.Errorf("error establishing connection to sentry: %s", err)