Remove statsd version of RPC stats (#2693)
* Remove statsd-style RPC stats. * Remove tests for old code.
This commit is contained in:
parent
d99800ecb1
commit
d542960a35
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/jmhodges/clock"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
|
@ -30,7 +29,7 @@ func ClientSetup(c *cmd.GRPCClientConfig, tls *tls.Config, stats metrics.Scope)
|
|||
|
||||
grpc_prometheus.EnableClientHandlingTimeHistogram()
|
||||
|
||||
ci := clientInterceptor{stats.NewScope("gRPCClient"), clock.Default(), c.Timeout.Duration}
|
||||
ci := clientInterceptor{c.Timeout.Duration}
|
||||
creds := bcreds.NewClientCredentials(tls.RootCAs, tls.Certificates)
|
||||
return grpc.Dial(
|
||||
"", // Since our staticResolver provides addresses we don't need to pass an address here
|
||||
|
|
|
@ -6,14 +6,12 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/core"
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
testproto "github.com/letsencrypt/boulder/grpc/test_proto"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/probs"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
@ -27,10 +25,8 @@ func (s *errorServer) Chill(_ context.Context, _ *testproto.Time) (*testproto.Ti
|
|||
}
|
||||
|
||||
func TestErrorWrapping(t *testing.T) {
|
||||
fc := clock.NewFake()
|
||||
stats := metrics.NewNoopScope()
|
||||
si := serverInterceptor{stats, fc}
|
||||
ci := clientInterceptor{stats, fc, time.Second}
|
||||
si := serverInterceptor{}
|
||||
ci := clientInterceptor{time.Second}
|
||||
srv := grpc.NewServer(grpc.UnaryInterceptor(si.intercept))
|
||||
es := &errorServer{}
|
||||
testproto.RegisterChillerServer(srv, es)
|
||||
|
|
|
@ -1,67 +1,40 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
berrors "github.com/letsencrypt/boulder/errors"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
)
|
||||
|
||||
// serverInterceptor is a gRPC interceptor that adds statsd and Prometheus
|
||||
// metrics to requests handled by a gRPC server.
|
||||
type serverInterceptor struct {
|
||||
stats metrics.Scope
|
||||
clk clock.Clock
|
||||
}
|
||||
|
||||
func cleanMethod(m string, trimService bool) string {
|
||||
m = strings.TrimLeft(m, "-")
|
||||
m = strings.Replace(m, "/", "_", -1)
|
||||
if trimService {
|
||||
s := strings.Split(m, "-")
|
||||
if len(s) == 1 {
|
||||
return m
|
||||
}
|
||||
return s[len(s)-1]
|
||||
}
|
||||
return strings.Replace(m, "-", "_", -1)
|
||||
}
|
||||
// serverInterceptor is a gRPC interceptor that adds Prometheus
|
||||
// metrics to requests handled by a gRPC server, and wraps Boulder-specific
|
||||
// errors for transmission in a grpc/metadata trailer (see bcodes.go).
|
||||
type serverInterceptor struct{}
|
||||
|
||||
func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if info == nil {
|
||||
si.stats.Inc("NoInfo", 1)
|
||||
return nil, berrors.InternalServerError("passed nil *grpc.UnaryServerInfo")
|
||||
}
|
||||
s := si.clk.Now()
|
||||
methodScope := si.stats.NewScope(cleanMethod(info.FullMethod, true))
|
||||
methodScope.Inc("Calls", 1)
|
||||
methodScope.GaugeDelta("InProgress", 1)
|
||||
resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
|
||||
methodScope.TimingDuration("Latency", si.clk.Since(s))
|
||||
methodScope.GaugeDelta("InProgress", -1)
|
||||
if err != nil {
|
||||
methodScope.Inc("Failed", 1)
|
||||
err = wrapError(ctx, err)
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// clientInterceptor is a gRPC interceptor that adds statsd and Prometheus
|
||||
// metrics to sent requests, and disables FailFast. We disable FailFast to
|
||||
// minimize disruption during restarts: if a client makes a request while
|
||||
// all backends are briefly down, the request doesn't necessarily fail. A
|
||||
// backend can service the request if it comes back up within the Context
|
||||
// deadline.
|
||||
// clientInterceptor is a gRPC interceptor that adds Prometheus
|
||||
// metrics to sent requests, and disables FailFast. We disable FailFast because
|
||||
// non-FailFast mode is most similar to the old AMQP RPC layer: If a client
|
||||
// makes a request while all backends are briefly down (e.g. for a restart), the
|
||||
// request doesn't necessarily fail. A backend can service the request if it
|
||||
// comes back up within the timeout. Under gRPC the same effect is achieved by
|
||||
// retries up to the Context deadline.
|
||||
type clientInterceptor struct {
|
||||
stats metrics.Scope
|
||||
clk clock.Clock
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
|
@ -77,10 +50,6 @@ func (ci *clientInterceptor) intercept(
|
|||
opts ...grpc.CallOption) error {
|
||||
localCtx, cancel := context.WithTimeout(ctx, ci.timeout)
|
||||
defer cancel()
|
||||
s := ci.clk.Now()
|
||||
methodScope := ci.stats.NewScope(cleanMethod(method, false))
|
||||
methodScope.Inc("Calls", 1)
|
||||
methodScope.GaugeDelta("InProgress", 1)
|
||||
// Disable fail-fast so RPCs will retry until deadline, even if all backends
|
||||
// are down.
|
||||
opts = append(opts, grpc.FailFast(false))
|
||||
|
@ -88,10 +57,7 @@ func (ci *clientInterceptor) intercept(
|
|||
md := metadata.New(nil)
|
||||
opts = append(opts, grpc.Trailer(&md))
|
||||
err := grpc_prometheus.UnaryClientInterceptor(localCtx, method, req, reply, cc, invoker, opts...)
|
||||
methodScope.TimingDuration("Latency", ci.clk.Since(s))
|
||||
methodScope.GaugeDelta("InProgress", -1)
|
||||
if err != nil {
|
||||
methodScope.Inc("Failed", 1)
|
||||
err = unwrapError(err, md)
|
||||
}
|
||||
return err
|
||||
|
|
|
@ -5,13 +5,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/grpc/test_proto"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
||||
|
@ -34,51 +32,23 @@ func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.Cli
|
|||
}
|
||||
|
||||
func TestServerInterceptor(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
statter := metrics.NewMockStatter(ctrl)
|
||||
stats := metrics.NewStatsdScope(statter, "fake", "gRPCServer")
|
||||
si := serverInterceptor{stats, fc}
|
||||
si := serverInterceptor{}
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPCServer.NoInfo", int64(1), float32(1.0)).Return(nil)
|
||||
_, err := si.intercept(context.Background(), nil, nil, testHandler)
|
||||
test.AssertError(t, err, "si.intercept didn't fail with a nil grpc.UnaryServerInfo")
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPCServer.test.Calls", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCServer.test.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPCServer.test.Latency", time.Second, float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCServer.test.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
_, err = si.intercept(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "-service-test"}, testHandler)
|
||||
test.AssertNotError(t, err, "si.intercept failed with a non-nil grpc.UnaryServerInfo")
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPCServer.brokeTest.Calls", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCServer.brokeTest.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPCServer.brokeTest.Latency", time.Duration(0), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCServer.brokeTest.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().Inc("fake.gRPCServer.brokeTest.Failed", int64(1), float32(1.0)).Return(nil)
|
||||
_, err = si.intercept(context.Background(), 0, &grpc.UnaryServerInfo{FullMethod: "brokeTest"}, testHandler)
|
||||
test.AssertError(t, err, "si.intercept didn't fail when handler returned a error")
|
||||
}
|
||||
|
||||
func TestClientInterceptor(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
statter := metrics.NewMockStatter(ctrl)
|
||||
stats := metrics.NewStatsdScope(statter, "fake", "gRPCClient")
|
||||
ci := clientInterceptor{stats, fc, time.Second}
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPCClient.service_test.Calls", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCClient.service_test.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPCClient.service_test.Latency", time.Second, float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCClient.service_test.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
ci := clientInterceptor{time.Second}
|
||||
err := ci.intercept(context.Background(), "-service-test", nil, nil, nil, testInvoker)
|
||||
test.AssertNotError(t, err, "ci.intercept failed with a non-nil grpc.UnaryServerInfo")
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPCClient.service_brokeTest.Calls", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCClient.service_brokeTest.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPCClient.service_brokeTest.Latency", time.Duration(0), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPCClient.service_brokeTest.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().Inc("fake.gRPCClient.service_brokeTest.Failed", int64(1), float32(1.0)).Return(nil)
|
||||
err = ci.intercept(context.Background(), "-service-brokeTest", nil, nil, nil, testInvoker)
|
||||
test.AssertError(t, err, "ci.intercept didn't fail when handler returned a error")
|
||||
}
|
||||
|
@ -99,7 +69,7 @@ func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_prot
|
|||
// timeout is reached, i.e. that FailFast is set to false.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
|
||||
func TestFailFastFalse(t *testing.T) {
|
||||
ci := &clientInterceptor{metrics.NewNoopScope(), clock.Default(), 100 * time.Millisecond}
|
||||
ci := &clientInterceptor{100 * time.Millisecond}
|
||||
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBalancer(grpc.RoundRobin(newStaticResolver([]string{"localhost:19000"}))),
|
||||
|
@ -120,23 +90,3 @@ func TestFailFastFalse(t *testing.T) {
|
|||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
|
||||
func TestCleanMethod(t *testing.T) {
|
||||
tests := []struct {
|
||||
in string
|
||||
out string
|
||||
stripService bool
|
||||
}{
|
||||
{"-ServiceName-MethodName", "ServiceName_MethodName", false},
|
||||
{"-ServiceName-MethodName", "MethodName", true},
|
||||
{"--MethodName", "MethodName", true},
|
||||
{"--MethodName", "MethodName", true},
|
||||
{"MethodName", "MethodName", false},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
out := cleanMethod(tc.in, tc.stripService)
|
||||
if out != tc.out {
|
||||
t.Fatalf("cleanMethod didn't return the expected name: expected: %q, got: %q", tc.out, out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"net"
|
||||
|
||||
"github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
"github.com/jmhodges/clock"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
|
@ -48,6 +47,6 @@ func NewServer(c *cmd.GRPCServerConfig, tls *tls.Config, stats metrics.Scope) (*
|
|||
|
||||
grpc_prometheus.EnableHandlingTimeHistogram()
|
||||
|
||||
si := &serverInterceptor{stats.NewScope("gRPCServer"), clock.Default()}
|
||||
si := &serverInterceptor{}
|
||||
return grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(si.intercept)), l, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue