From 5b865f1d6311c83d11767ab2910eb3d608671216 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Fri, 9 Dec 2016 12:03:45 -0800 Subject: [PATCH] Disable fail-fast for gRPC. (#2397) This allows us to restart backends with relatively little interruption in service, provided the backends come up promptly. Fixes #2389 and #2408 --- grpc/interceptors.go | 12 ++ grpc/interceptors_test.go | 41 ++++++- grpc/test_proto/generate.go | 3 + grpc/test_proto/interceptors_test.pb.go | 143 ++++++++++++++++++++++++ grpc/test_proto/interceptors_test.proto | 12 ++ rpc/amqp-rpc.go | 25 +++++ test/startservers.py | 4 +- 7 files changed, 237 insertions(+), 3 deletions(-) create mode 100644 grpc/test_proto/generate.go create mode 100644 grpc/test_proto/interceptors_test.pb.go create mode 100644 grpc/test_proto/interceptors_test.proto diff --git a/grpc/interceptors.go b/grpc/interceptors.go index 908dc87bb..a5699208e 100644 --- a/grpc/interceptors.go +++ b/grpc/interceptors.go @@ -13,6 +13,8 @@ import ( "google.golang.org/grpc" ) +// serverInterceptor is a gRPC interceptor that adds statsd and Prometheus +// metrics to requests handled by a gRPC server. type serverInterceptor struct { stats metrics.Scope clk clock.Clock @@ -48,6 +50,13 @@ func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, inf return resp, err } +// clientInterceptor is a gRPC interceptor that adds statsd and Prometheus +// metrics to sent requests, and disables FailFast. We disable FailFast because +// non-FailFast mode is most similar to the old AMQP RPC layer: If a client +// makes a request while all backends are briefly down (e.g. for a restart), the +// request doesn't necessarily fail. A backend can service the request if it +// comes back up within the timeout. Under gRPC the same effect is achieved by +// retries up to the Context deadline. type clientInterceptor struct { stats metrics.Scope clk clock.Clock @@ -70,6 +79,9 @@ func (ci *clientInterceptor) intercept( methodScope := ci.stats.NewScope(cleanMethod(method, false)) methodScope.Inc("Calls", 1) methodScope.GaugeDelta("InProgress", 1) + // Disable fail-fast so RPCs will retry until deadline, even if all backends + // are down. + opts = append(opts, grpc.FailFast(false)) err := grpc_prometheus.UnaryClientInterceptor(localCtx, method, req, reply, cc, invoker, opts...) methodScope.TimingDuration("Latency", ci.clk.Since(s)) methodScope.GaugeDelta("InProgress", -1) diff --git a/grpc/interceptors_test.go b/grpc/interceptors_test.go index aecedbada..d18512a1e 100644 --- a/grpc/interceptors_test.go +++ b/grpc/interceptors_test.go @@ -10,6 +10,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "github.com/letsencrypt/boulder/grpc/test_proto" "github.com/letsencrypt/boulder/metrics" "github.com/letsencrypt/boulder/test" ) @@ -24,7 +25,7 @@ func testHandler(_ context.Context, i interface{}) (interface{}, error) { return nil, nil } -func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.ClientConn, _ ...grpc.CallOption) error { +func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.ClientConn, opts ...grpc.CallOption) error { if method == "-service-brokeTest" { return errors.New("") } @@ -82,6 +83,44 @@ func TestClientInterceptor(t *testing.T) { test.AssertError(t, err, "ci.intercept didn't fail when handler returned a error") } +// testServer is used to implement InterceptorTest +type testServer struct{} + +// Chill implements InterceptorTest.Chill +func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) { + start := time.Now() + time.Sleep(time.Duration(*in.Time) * time.Nanosecond) + spent := int64(time.Since(start) / time.Nanosecond) + return &test_proto.Time{Time: &spent}, nil +} + +// TestFailFastFalse sends a gRPC request to a backend that is +// unavailable, and ensures that the request doesn't error out until the +// timeout is reached, i.e. that FailFast is set to false. +// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md +func TestFailFastFalse(t *testing.T) { + ci := &clientInterceptor{metrics.NewNoopScope(), clock.Default(), 100 * time.Millisecond} + conn, err := grpc.Dial("localhost:19876", // random, probably unused port + grpc.WithInsecure(), + grpc.WithBalancer(grpc.RoundRobin(newStaticResolver([]string{"localhost:19000"}))), + grpc.WithUnaryInterceptor(ci.intercept)) + if err != nil { + t.Fatalf("did not connect: %v", err) + } + c := test_proto.NewChillerClient(conn) + + start := time.Now() + var second int64 = time.Second.Nanoseconds() + _, err = c.Chill(context.Background(), &test_proto.Time{Time: &second}) + if err == nil { + t.Errorf("Successful Chill when we expected failure.") + } + if time.Since(start) < 90*time.Millisecond { + t.Errorf("Chill failed fast, when FailFast should be disabled.") + } + _ = conn.Close() +} + func TestCleanMethod(t *testing.T) { tests := []struct { in string diff --git a/grpc/test_proto/generate.go b/grpc/test_proto/generate.go new file mode 100644 index 000000000..0d4936f71 --- /dev/null +++ b/grpc/test_proto/generate.go @@ -0,0 +1,3 @@ +package test_proto + +//go:generate sh -c "cd ../.. && protoc --go_out=plugins=grpc,Mcore/proto/core.proto=github.com/letsencrypt/boulder/grpc/test_proto:. grpc/test_proto/interceptors_test.proto" diff --git a/grpc/test_proto/interceptors_test.pb.go b/grpc/test_proto/interceptors_test.pb.go new file mode 100644 index 000000000..aec62b1f4 --- /dev/null +++ b/grpc/test_proto/interceptors_test.pb.go @@ -0,0 +1,143 @@ +// Code generated by protoc-gen-go. +// source: grpc/test_proto/interceptors_test.proto +// DO NOT EDIT! + +/* +Package test_proto is a generated protocol buffer package. + +It is generated from these files: + grpc/test_proto/interceptors_test.proto + +It has these top-level messages: + Time +*/ +package test_proto + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Time struct { + Time *int64 `protobuf:"varint,1,opt,name=time" json:"time,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Time) Reset() { *m = Time{} } +func (m *Time) String() string { return proto.CompactTextString(m) } +func (*Time) ProtoMessage() {} +func (*Time) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Time) GetTime() int64 { + if m != nil && m.Time != nil { + return *m.Time + } + return 0 +} + +func init() { + proto.RegisterType((*Time)(nil), "Time") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion3 + +// Client API for Chiller service + +type ChillerClient interface { + // Sleep for the given amount of time, and return the amount of time slept. + Chill(ctx context.Context, in *Time, opts ...grpc.CallOption) (*Time, error) +} + +type chillerClient struct { + cc *grpc.ClientConn +} + +func NewChillerClient(cc *grpc.ClientConn) ChillerClient { + return &chillerClient{cc} +} + +func (c *chillerClient) Chill(ctx context.Context, in *Time, opts ...grpc.CallOption) (*Time, error) { + out := new(Time) + err := grpc.Invoke(ctx, "/Chiller/Chill", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Chiller service + +type ChillerServer interface { + // Sleep for the given amount of time, and return the amount of time slept. + Chill(context.Context, *Time) (*Time, error) +} + +func RegisterChillerServer(s *grpc.Server, srv ChillerServer) { + s.RegisterService(&_Chiller_serviceDesc, srv) +} + +func _Chiller_Chill_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Time) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChillerServer).Chill(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Chiller/Chill", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChillerServer).Chill(ctx, req.(*Time)) + } + return interceptor(ctx, in, info, handler) +} + +var _Chiller_serviceDesc = grpc.ServiceDesc{ + ServiceName: "Chiller", + HandlerType: (*ChillerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Chill", + Handler: _Chiller_Chill_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: fileDescriptor0, +} + +func init() { proto.RegisterFile("grpc/test_proto/interceptors_test.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 114 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x52, 0x4f, 0x2f, 0x2a, 0x48, + 0xd6, 0x2f, 0x49, 0x2d, 0x2e, 0x89, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0xcf, 0xcc, 0x2b, 0x49, + 0x2d, 0x4a, 0x4e, 0x2d, 0x28, 0xc9, 0x2f, 0x2a, 0x8e, 0x07, 0x89, 0xeb, 0x81, 0xc5, 0x95, 0x44, + 0xb8, 0x58, 0x42, 0x32, 0x73, 0x53, 0x85, 0x78, 0xb8, 0x58, 0x4a, 0x32, 0x73, 0x53, 0x25, 0x18, + 0x15, 0x18, 0x35, 0x98, 0x8d, 0x94, 0xb8, 0xd8, 0x9d, 0x33, 0x32, 0x73, 0x72, 0x52, 0x8b, 0x84, + 0xc4, 0xb9, 0x58, 0xc1, 0x4c, 0x21, 0x56, 0x3d, 0x90, 0x42, 0x29, 0x08, 0xa5, 0xc4, 0xe0, 0xc4, + 0x13, 0xc5, 0x85, 0x30, 0x1f, 0x10, 0x00, 0x00, 0xff, 0xff, 0x39, 0x6f, 0xd5, 0x15, 0x71, 0x00, + 0x00, 0x00, +} diff --git a/grpc/test_proto/interceptors_test.proto b/grpc/test_proto/interceptors_test.proto new file mode 100644 index 000000000..c83051158 --- /dev/null +++ b/grpc/test_proto/interceptors_test.proto @@ -0,0 +1,12 @@ +syntax = "proto2"; + +option go_package = "test_proto"; + +service Chiller { + // Sleep for the given amount of time, and return the amount of time slept. + rpc Chill(Time) returns (Time) {} +} + +message Time { + optional int64 time = 1; // In nanoseconds +} diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 89672f9b2..94413bc2f 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -536,6 +536,31 @@ func NewAmqpRPCClient( return nil, err } + // Ensure that the queue we want to write to exists on the AMQP server. This + // avoids some test flakiness depending on the start order of the Boulder + // components (https://github.com/letsencrypt/boulder/issues/2294) + _, err = rpc.connection.channel.QueueDeclare( + rpcConf.Server, + AmqpDurable, + AmqpDeleteUnused, + AmqpExclusive, + AmqpNoWait, + nil) + if err != nil { + return nil, fmt.Errorf("declaring queue %s: %s", rpcConf.Server, err) + } + // Make sure that messages with the appropriate routing key get routed to the + // server's queue. + err = rpc.connection.channel.QueueBind( + rpcConf.Server, + rpcConf.Server, + AmqpExchange, + false, + nil) + if err != nil { + return nil, fmt.Errorf("binding routing key %s", err) + } + go func() { for { select { diff --git a/test/startservers.py b/test/startservers.py index d949eca1d..a44aa990a 100644 --- a/test/startservers.py +++ b/test/startservers.py @@ -46,7 +46,6 @@ def start(race_detection): global processes forward() progs = [ - 'boulder-sa --config %s' % os.path.join(default_config_dir, "sa.json"), 'boulder-wfe --config %s' % os.path.join(default_config_dir, "wfe.json"), 'boulder-ra --config %s' % os.path.join(default_config_dir, "ra.json"), 'boulder-ca --config %s' % os.path.join(default_config_dir, "ca.json"), @@ -56,7 +55,8 @@ def start(race_detection): 'ocsp-responder --config %s' % os.path.join(default_config_dir, "ocsp-responder.json"), 'ct-test-srv', 'dns-test-srv', - 'mail-test-srv --closeFirst 5' + 'mail-test-srv --closeFirst 5', + 'boulder-sa --config %s' % os.path.join(default_config_dir, "sa.json") ] if not install(race_detection): return False