Add gRPC server-side interceptor (#1933)
Adds a server side unary RPC interceptor which includes basic stats. We could also use this to add a server request ID to the context.Context to identify the call through the system, but really I'd rather do that on the client side before the RPC is sent which requires the client interceptor implementation upstream. Also updates google.golang.org/grpc. Updates #1880.
This commit is contained in:
parent
67fd6ef67c
commit
7b29dba75d
|
@ -48,7 +48,7 @@ func main() {
|
|||
cmd.FailOnError(err, "Unable to create SA client")
|
||||
|
||||
if c.Publisher.GRPC != nil {
|
||||
s, l, err := bgrpc.NewServer(c.Publisher.GRPC)
|
||||
s, l, err := bgrpc.NewServer(c.Publisher.GRPC, metrics.NewStatsdScope(stats, "Publisher"))
|
||||
cmd.FailOnError(err, "Failed to setup gRPC server")
|
||||
gw := bgrpc.NewPublisherServerWrapper(pubi)
|
||||
pubPB.RegisterPublisherServer(s, gw)
|
||||
|
|
|
@ -91,7 +91,7 @@ func main() {
|
|||
amqpConf := c.VA.AMQP
|
||||
|
||||
if c.VA.GRPC != nil {
|
||||
s, l, err := bgrpc.NewServer(c.VA.GRPC)
|
||||
s, l, err := bgrpc.NewServer(c.VA.GRPC, metrics.NewStatsdScope(stats, "VA"))
|
||||
cmd.FailOnError(err, "Unable to setup VA gRPC server")
|
||||
err = bgrpc.RegisterValidationAuthorityGRPCServer(s, vai)
|
||||
cmd.FailOnError(err, "Unable to register VA gRPC server")
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
|
||||
type caaCheckerServer struct {
|
||||
resolver bdns.DNSResolver
|
||||
stats statsd.Statter
|
||||
stats metrics.Scope
|
||||
}
|
||||
|
||||
// caaSet consists of filtered CAA records
|
||||
|
@ -150,12 +150,12 @@ func (ccs *caaCheckerServer) checkCAA(ctx context.Context, hostname string, issu
|
|||
|
||||
if caaSet.criticalUnknown() {
|
||||
// Contains unknown critical directives.
|
||||
ccs.stats.Inc("CCS.UnknownCritical", 1, 1.0)
|
||||
ccs.stats.Inc("CCS.UnknownCritical", 1)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
if len(caaSet.Unknown) > 0 {
|
||||
ccs.stats.Inc("CCS.WithUnknownNoncritical", 1, 1.0)
|
||||
ccs.stats.Inc("CCS.WithUnknownNoncritical", 1)
|
||||
}
|
||||
|
||||
if len(caaSet.Issue) == 0 {
|
||||
|
@ -163,7 +163,7 @@ func (ccs *caaCheckerServer) checkCAA(ctx context.Context, hostname string, issu
|
|||
// (e.g. there is only an issuewild directive, but we are checking for a
|
||||
// non-wildcard identifier, or there is only an iodef or non-critical unknown
|
||||
// directive.)
|
||||
ccs.stats.Inc("CCS.CAA.NoneRelevant", 1, 1.0)
|
||||
ccs.stats.Inc("CCS.CAA.NoneRelevant", 1)
|
||||
return true, true, nil
|
||||
}
|
||||
|
||||
|
@ -174,13 +174,13 @@ func (ccs *caaCheckerServer) checkCAA(ctx context.Context, hostname string, issu
|
|||
// Our CAA identity must be found in the chosen checkSet.
|
||||
for _, caa := range caaSet.Issue {
|
||||
if extractIssuerDomain(caa) == issuer {
|
||||
ccs.stats.Inc("CCS.CAA.Authorized", 1, 1.0)
|
||||
ccs.stats.Inc("CCS.CAA.Authorized", 1)
|
||||
return true, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// The list of authorized issuers is non-empty, but we are not in it. Fail.
|
||||
ccs.stats.Inc("CCS.CAA.Unauthorized", 1, 1.0)
|
||||
ccs.stats.Inc("CCS.CAA.Unauthorized", 1)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
|
@ -226,18 +226,19 @@ func main() {
|
|||
|
||||
stats, err := statsd.NewClient(c.StatsdServer, c.StatsdPrefix)
|
||||
cmd.FailOnError(err, "Failed to create StatsD client")
|
||||
scope := metrics.NewStatsdScope(stats, "caa-service")
|
||||
|
||||
resolver := bdns.NewDNSResolverImpl(
|
||||
c.DNSTimeout.Duration,
|
||||
[]string{c.DNSResolver},
|
||||
metrics.NewStatsdScope(stats, "caa-service"),
|
||||
scope,
|
||||
clock.Default(),
|
||||
5,
|
||||
)
|
||||
|
||||
s, l, err := bgrpc.NewServer(&c.GRPC)
|
||||
s, l, err := bgrpc.NewServer(&c.GRPC, scope)
|
||||
cmd.FailOnError(err, "Failed to setup gRPC server")
|
||||
ccs := &caaCheckerServer{resolver, stats}
|
||||
ccs := &caaCheckerServer{resolver, scope}
|
||||
pb.RegisterCAACheckerServer(s, ccs)
|
||||
err = s.Serve(l)
|
||||
cmd.FailOnError(err, "gRPC service failed")
|
||||
|
|
|
@ -3,11 +3,11 @@ package main
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cactus/go-statsd-client/statsd"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/letsencrypt/boulder/bdns"
|
||||
pb "github.com/letsencrypt/boulder/cmd/caa-checker/proto"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
||||
|
@ -42,7 +42,7 @@ func TestChecking(t *testing.T) {
|
|||
{"unsatisfiable.com", true, false},
|
||||
}
|
||||
|
||||
stats, _ := statsd.NewNoopClient()
|
||||
stats := metrics.NewNoopScope()
|
||||
ccs := &caaCheckerServer{&bdns.MockDNSResolver{}, stats}
|
||||
issuerDomain := "letsencrypt.org"
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type serverInterceptor struct {
|
||||
stats metrics.Scope
|
||||
clk clock.Clock
|
||||
}
|
||||
|
||||
func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
if info == nil {
|
||||
si.stats.Inc("gRPC.NoInfo", 1)
|
||||
return nil, errors.New("passed nil *grpc.UnaryServerInfo")
|
||||
}
|
||||
si.stats.Inc(fmt.Sprintf("gRPC.%s", info.FullMethod), 1)
|
||||
si.stats.GaugeDelta(fmt.Sprintf("gRPC.%s.InProgress", info.FullMethod), 1)
|
||||
s := si.clk.Now()
|
||||
resp, err := handler(ctx, req)
|
||||
si.stats.TimingDuration(fmt.Sprintf("gRPC.%s", info.FullMethod), si.clk.Now().Sub(s))
|
||||
si.stats.GaugeDelta(fmt.Sprintf("gRPC.%s.InProgress", info.FullMethod), -1)
|
||||
if err != nil {
|
||||
si.stats.Inc(fmt.Sprintf("gRPC.%s.Failed", info.FullMethod), 1)
|
||||
}
|
||||
return resp, err
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/jmhodges/clock"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
"github.com/letsencrypt/boulder/test"
|
||||
)
|
||||
|
||||
var fc = clock.NewFake()
|
||||
|
||||
func testHandler(_ context.Context, i interface{}) (interface{}, error) {
|
||||
if i != nil {
|
||||
return nil, errors.New("")
|
||||
}
|
||||
fc.Sleep(time.Second)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestServerInterceptor(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
statter := metrics.NewMockStatter(ctrl)
|
||||
stats := metrics.NewStatsdScope(statter, "fake")
|
||||
si := serverInterceptor{stats, fc}
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPC.NoInfo", int64(1), float32(1.0)).Return(nil)
|
||||
_, err := si.intercept(context.Background(), nil, nil, testHandler)
|
||||
test.AssertError(t, err, "si.intercept didn't fail with a nil grpc.UnaryServerInfo")
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPC.test", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPC.test.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPC.test", time.Second, float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPC.test.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
_, err = si.intercept(context.Background(), nil, &grpc.UnaryServerInfo{FullMethod: "test"}, testHandler)
|
||||
test.AssertNotError(t, err, "si.intercept failed with a non-nil grpc.UnaryServerInfo")
|
||||
|
||||
statter.EXPECT().Inc("fake.gRPC.broke-test", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPC.broke-test.InProgress", int64(1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().TimingDuration("fake.gRPC.broke-test", time.Duration(0), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().GaugeDelta("fake.gRPC.broke-test.InProgress", int64(-1), float32(1.0)).Return(nil)
|
||||
statter.EXPECT().Inc("fake.gRPC.broke-test.Failed", int64(1), float32(1.0)).Return(nil)
|
||||
_, err = si.intercept(context.Background(), 0, &grpc.UnaryServerInfo{FullMethod: "broke-test"}, testHandler)
|
||||
test.AssertError(t, err, "si.intercept didn't fail when handler returned a error")
|
||||
}
|
|
@ -8,11 +8,13 @@ import (
|
|||
"io/ioutil"
|
||||
"net"
|
||||
|
||||
"github.com/jmhodges/clock"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"github.com/letsencrypt/boulder/cmd"
|
||||
bcreds "github.com/letsencrypt/boulder/grpc/creds"
|
||||
"github.com/letsencrypt/boulder/metrics"
|
||||
)
|
||||
|
||||
// CodedError is a alias required to appease go vet
|
||||
|
@ -50,7 +52,7 @@ func ClientSetup(c *cmd.GRPCClientConfig) (*grpc.ClientConn, error) {
|
|||
// gRPC Server that verifies the client certificate was
|
||||
// issued by the provided issuer certificate and presents a
|
||||
// a server TLS certificate.
|
||||
func NewServer(c *cmd.GRPCServerConfig) (*grpc.Server, net.Listener, error) {
|
||||
func NewServer(c *cmd.GRPCServerConfig, stats metrics.Scope) (*grpc.Server, net.Listener, error) {
|
||||
cert, err := tls.LoadX509KeyPair(c.ServerCertificatePath, c.ServerKeyPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -73,5 +75,6 @@ func NewServer(c *cmd.GRPCServerConfig) (*grpc.Server, net.Listener, error) {
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return grpc.NewServer(grpc.Creds(creds)), l, nil
|
||||
si := &serverInterceptor{stats, clock.Default()}
|
||||
return grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(si.intercept)), l, nil
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
//go:generate mockgen -package metrics -destination ./mock_statsd_test.go github.com/cactus/go-statsd-client/statsd Statter
|
||||
//go:generate mockgen -package metrics -destination ./mock_statsd.go github.com/cactus/go-statsd-client/statsd Statter
|
||||
|
||||
package metrics
|
||||
|
||||
|
|
Loading…
Reference in New Issue