Disable fail-fast for gRPC. (#2397)

This allows us to restart backends with relatively little interruption in
service, provided the backends come up promptly.

Fixes #2389 and #2408
This commit is contained in:
Jacob Hoffman-Andrews 2016-12-09 12:03:45 -08:00 committed by Roland Bracewell Shoemaker
parent 3cff6babb3
commit 5b865f1d63
7 changed files with 237 additions and 3 deletions

View File

@ -13,6 +13,8 @@ import (
"google.golang.org/grpc"
)
// serverInterceptor is a gRPC interceptor that adds statsd and Prometheus
// metrics to requests handled by a gRPC server.
type serverInterceptor struct {
stats metrics.Scope
clk clock.Clock
@ -48,6 +50,13 @@ func (si *serverInterceptor) intercept(ctx context.Context, req interface{}, inf
return resp, err
}
// clientInterceptor is a gRPC interceptor that adds statsd and Prometheus
// metrics to sent requests, and disables FailFast. We disable FailFast because
// non-FailFast mode is most similar to the old AMQP RPC layer: If a client
// makes a request while all backends are briefly down (e.g. for a restart), the
// request doesn't necessarily fail. A backend can service the request if it
// comes back up within the timeout. Under gRPC the same effect is achieved by
// retries up to the Context deadline.
type clientInterceptor struct {
stats metrics.Scope
clk clock.Clock
@ -70,6 +79,9 @@ func (ci *clientInterceptor) intercept(
methodScope := ci.stats.NewScope(cleanMethod(method, false))
methodScope.Inc("Calls", 1)
methodScope.GaugeDelta("InProgress", 1)
// Disable fail-fast so RPCs will retry until deadline, even if all backends
// are down.
opts = append(opts, grpc.FailFast(false))
err := grpc_prometheus.UnaryClientInterceptor(localCtx, method, req, reply, cc, invoker, opts...)
methodScope.TimingDuration("Latency", ci.clk.Since(s))
methodScope.GaugeDelta("InProgress", -1)

View File

@ -10,6 +10,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/letsencrypt/boulder/grpc/test_proto"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/test"
)
@ -24,7 +25,7 @@ func testHandler(_ context.Context, i interface{}) (interface{}, error) {
return nil, nil
}
func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
func testInvoker(_ context.Context, method string, _, _ interface{}, _ *grpc.ClientConn, opts ...grpc.CallOption) error {
if method == "-service-brokeTest" {
return errors.New("")
}
@ -82,6 +83,44 @@ func TestClientInterceptor(t *testing.T) {
test.AssertError(t, err, "ci.intercept didn't fail when handler returned a error")
}
// testServer is used to implement InterceptorTest
type testServer struct{}
// Chill implements InterceptorTest.Chill
func (s *testServer) Chill(ctx context.Context, in *test_proto.Time) (*test_proto.Time, error) {
start := time.Now()
time.Sleep(time.Duration(*in.Time) * time.Nanosecond)
spent := int64(time.Since(start) / time.Nanosecond)
return &test_proto.Time{Time: &spent}, nil
}
// TestFailFastFalse sends a gRPC request to a backend that is
// unavailable, and ensures that the request doesn't error out until the
// timeout is reached, i.e. that FailFast is set to false.
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
func TestFailFastFalse(t *testing.T) {
ci := &clientInterceptor{metrics.NewNoopScope(), clock.Default(), 100 * time.Millisecond}
conn, err := grpc.Dial("localhost:19876", // random, probably unused port
grpc.WithInsecure(),
grpc.WithBalancer(grpc.RoundRobin(newStaticResolver([]string{"localhost:19000"}))),
grpc.WithUnaryInterceptor(ci.intercept))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
c := test_proto.NewChillerClient(conn)
start := time.Now()
var second int64 = time.Second.Nanoseconds()
_, err = c.Chill(context.Background(), &test_proto.Time{Time: &second})
if err == nil {
t.Errorf("Successful Chill when we expected failure.")
}
if time.Since(start) < 90*time.Millisecond {
t.Errorf("Chill failed fast, when FailFast should be disabled.")
}
_ = conn.Close()
}
func TestCleanMethod(t *testing.T) {
tests := []struct {
in string

View File

@ -0,0 +1,3 @@
package test_proto
//go:generate sh -c "cd ../.. && protoc --go_out=plugins=grpc,Mcore/proto/core.proto=github.com/letsencrypt/boulder/grpc/test_proto:. grpc/test_proto/interceptors_test.proto"

View File

@ -0,0 +1,143 @@
// Code generated by protoc-gen-go.
// source: grpc/test_proto/interceptors_test.proto
// DO NOT EDIT!
/*
Package test_proto is a generated protocol buffer package.
It is generated from these files:
grpc/test_proto/interceptors_test.proto
It has these top-level messages:
Time
*/
package test_proto
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Time struct {
Time *int64 `protobuf:"varint,1,opt,name=time" json:"time,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Time) Reset() { *m = Time{} }
func (m *Time) String() string { return proto.CompactTextString(m) }
func (*Time) ProtoMessage() {}
func (*Time) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Time) GetTime() int64 {
if m != nil && m.Time != nil {
return *m.Time
}
return 0
}
func init() {
proto.RegisterType((*Time)(nil), "Time")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
// Client API for Chiller service
type ChillerClient interface {
// Sleep for the given amount of time, and return the amount of time slept.
Chill(ctx context.Context, in *Time, opts ...grpc.CallOption) (*Time, error)
}
type chillerClient struct {
cc *grpc.ClientConn
}
func NewChillerClient(cc *grpc.ClientConn) ChillerClient {
return &chillerClient{cc}
}
func (c *chillerClient) Chill(ctx context.Context, in *Time, opts ...grpc.CallOption) (*Time, error) {
out := new(Time)
err := grpc.Invoke(ctx, "/Chiller/Chill", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Chiller service
type ChillerServer interface {
// Sleep for the given amount of time, and return the amount of time slept.
Chill(context.Context, *Time) (*Time, error)
}
func RegisterChillerServer(s *grpc.Server, srv ChillerServer) {
s.RegisterService(&_Chiller_serviceDesc, srv)
}
func _Chiller_Chill_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Time)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ChillerServer).Chill(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/Chiller/Chill",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChillerServer).Chill(ctx, req.(*Time))
}
return interceptor(ctx, in, info, handler)
}
var _Chiller_serviceDesc = grpc.ServiceDesc{
ServiceName: "Chiller",
HandlerType: (*ChillerServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Chill",
Handler: _Chiller_Chill_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
}
func init() { proto.RegisterFile("grpc/test_proto/interceptors_test.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 114 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x52, 0x4f, 0x2f, 0x2a, 0x48,
0xd6, 0x2f, 0x49, 0x2d, 0x2e, 0x89, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0xcf, 0xcc, 0x2b, 0x49,
0x2d, 0x4a, 0x4e, 0x2d, 0x28, 0xc9, 0x2f, 0x2a, 0x8e, 0x07, 0x89, 0xeb, 0x81, 0xc5, 0x95, 0x44,
0xb8, 0x58, 0x42, 0x32, 0x73, 0x53, 0x85, 0x78, 0xb8, 0x58, 0x4a, 0x32, 0x73, 0x53, 0x25, 0x18,
0x15, 0x18, 0x35, 0x98, 0x8d, 0x94, 0xb8, 0xd8, 0x9d, 0x33, 0x32, 0x73, 0x72, 0x52, 0x8b, 0x84,
0xc4, 0xb9, 0x58, 0xc1, 0x4c, 0x21, 0x56, 0x3d, 0x90, 0x42, 0x29, 0x08, 0xa5, 0xc4, 0xe0, 0xc4,
0x13, 0xc5, 0x85, 0x30, 0x1f, 0x10, 0x00, 0x00, 0xff, 0xff, 0x39, 0x6f, 0xd5, 0x15, 0x71, 0x00,
0x00, 0x00,
}

View File

@ -0,0 +1,12 @@
syntax = "proto2";
option go_package = "test_proto";
service Chiller {
// Sleep for the given amount of time, and return the amount of time slept.
rpc Chill(Time) returns (Time) {}
}
message Time {
optional int64 time = 1; // In nanoseconds
}

View File

@ -536,6 +536,31 @@ func NewAmqpRPCClient(
return nil, err
}
// Ensure that the queue we want to write to exists on the AMQP server. This
// avoids some test flakiness depending on the start order of the Boulder
// components (https://github.com/letsencrypt/boulder/issues/2294)
_, err = rpc.connection.channel.QueueDeclare(
rpcConf.Server,
AmqpDurable,
AmqpDeleteUnused,
AmqpExclusive,
AmqpNoWait,
nil)
if err != nil {
return nil, fmt.Errorf("declaring queue %s: %s", rpcConf.Server, err)
}
// Make sure that messages with the appropriate routing key get routed to the
// server's queue.
err = rpc.connection.channel.QueueBind(
rpcConf.Server,
rpcConf.Server,
AmqpExchange,
false,
nil)
if err != nil {
return nil, fmt.Errorf("binding routing key %s", err)
}
go func() {
for {
select {

View File

@ -46,7 +46,6 @@ def start(race_detection):
global processes
forward()
progs = [
'boulder-sa --config %s' % os.path.join(default_config_dir, "sa.json"),
'boulder-wfe --config %s' % os.path.join(default_config_dir, "wfe.json"),
'boulder-ra --config %s' % os.path.join(default_config_dir, "ra.json"),
'boulder-ca --config %s' % os.path.join(default_config_dir, "ca.json"),
@ -56,7 +55,8 @@ def start(race_detection):
'ocsp-responder --config %s' % os.path.join(default_config_dir, "ocsp-responder.json"),
'ct-test-srv',
'dns-test-srv',
'mail-test-srv --closeFirst 5'
'mail-test-srv --closeFirst 5',
'boulder-sa --config %s' % os.path.join(default_config_dir, "sa.json")
]
if not install(race_detection):
return False