From 33b85530fb85055187ef8599a825b940f2dc1695 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Mon, 15 Jun 2015 16:37:18 -0700 Subject: [PATCH] add HealthCheck feature and corresponding end2end tests --- health.go | 56 ++++++++++++++++++++++++++++++ test/end2end_test.go | 82 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 122 insertions(+), 16 deletions(-) create mode 100644 health.go diff --git a/health.go b/health.go new file mode 100644 index 000000000..23ab10faf --- /dev/null +++ b/health.go @@ -0,0 +1,56 @@ +package grpc + +import proto "github.com/golang/protobuf/proto" +import ( + "time" + + "golang.org/x/net/context" +) + +type HealthCheckRequest struct { +} + +func (m *HealthCheckRequest) Reset() { *m = HealthCheckRequest{} } +func (m *HealthCheckRequest) String() string { return proto.CompactTextString(m) } +func (*HealthCheckRequest) ProtoMessage() {} + +type HealthCheckResponse struct { +} + +func (m *HealthCheckResponse) Reset() { *m = HealthCheckResponse{} } +func (m *HealthCheckResponse) String() string { return proto.CompactTextString(m) } +func (*HealthCheckResponse) ProtoMessage() {} + +var _HealthCheck_serviceDesc = ServiceDesc{ + ServiceName: "grpc.HealthCheck", + HandlerType: nil, + Methods: []MethodDesc{ + { + MethodName: "Check", + Handler: _HealthCheck_Handler, + }, + }, + Streams: []StreamDesc{}, +} + +func Enable(s *Server) { + s.register(&_HealthCheck_serviceDesc, nil) +} + +func _HealthCheck_Handler(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error) { + in := new(HealthCheckRequest) + if err := codec.Unmarshal(buf, in); err != nil { + return nil, err + } + out := new(HealthCheckResponse) + return out, nil +} + +func Check(t time.Duration, cc *ClientConn, in *HealthCheckRequest) error { + ctx, _ := context.WithTimeout(context.Background(), t) + out := new(HealthCheckResponse) + if err := Invoke(ctx, "grpc.HealthCheck/Check", in, out, cc); err != nil { + return err + } + return nil +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 981a72e91..592a6b4d7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -280,10 +280,11 @@ func listTestEnv() []env { if runtime.GOOS == "windows" { return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}} } - return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} + //return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} + return []env{env{"unix", unixDialer, ""}} } -func setUp(maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { +func setUp(healthCheck bool, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} la := ":0" switch e.network { @@ -303,6 +304,9 @@ func setUp(maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts = append(sopts, grpc.Creds(creds)) } s = grpc.NewServer(sopts...) + if healthCheck { + grpc.Enable(s) + } testpb.RegisterTestServiceServer(s, &testServer{}) go s.Serve(lis) addr := la @@ -342,7 +346,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error @@ -355,6 +359,52 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc.Close() } +func TestHealthCheckOnSucceed(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckOnSucceed(t, e) + } +} + +func testHealthCheckOnSucceed(t *testing.T, e env) { + s, cc := setUp(true, math.MaxUint32, e) + defer tearDown(s, cc) + in := new(grpc.HealthCheckRequest) + if err := grpc.Check(1*time.Second, cc, in); err != nil { + t.Fatalf("HealthCheck(_)=_, %v, want ", err) + } +} + +func TestHealthCheckOnFailure(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckOnFailure(t, e) + } +} + +func testHealthCheckOnFailure(t *testing.T, e env) { + s, cc := setUp(true, math.MaxUint32, e) + defer tearDown(s, cc) + in := new(grpc.HealthCheckRequest) + if err := grpc.Check(0*time.Second, cc, in); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { + t.Fatalf("HealthCheck(_)=_, %v, want ", err) + } +} + +func TestHealthCheckOff(t *testing.T) { + for _, e := range listTestEnv() { + testHealthCheckOff(t, e) + } +} + +func testHealthCheckOff(t *testing.T, e env) { + s, cc := setUp(false, math.MaxUint32, e) + defer tearDown(s, cc) + in := new(grpc.HealthCheckRequest) + err := grpc.Check(1 * time.Second, cc, in) + if err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.HealthCheck") { + t.Fatalf("HealthCheck(_)=_, %v, want ", err) + } +} + func TestEmptyUnary(t *testing.T) { for _, e := range listTestEnv() { testEmptyUnary(t, e) @@ -362,7 +412,7 @@ func TestEmptyUnary(t *testing.T) { } func testEmptyUnary(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) @@ -378,7 +428,7 @@ func TestFailedEmptyUnary(t *testing.T) { } func testFailedEmptyUnary(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -394,7 +444,7 @@ func TestLargeUnary(t *testing.T) { } func testLargeUnary(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 271828 @@ -422,7 +472,7 @@ func TestMetadataUnaryRPC(t *testing.T) { } func testMetadataUnaryRPC(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -476,7 +526,7 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) var wg sync.WaitGroup @@ -506,7 +556,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -532,7 +582,7 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -564,7 +614,7 @@ func TestPingPong(t *testing.T) { } func testPingPong(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) @@ -615,7 +665,7 @@ func TestMetadataStreamingRPC(t *testing.T) { } func testMetadataStreamingRPC(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -672,7 +722,7 @@ func TestServerStreaming(t *testing.T) { } func testServerStreaming(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -724,7 +774,7 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -754,7 +804,7 @@ func TestClientStreaming(t *testing.T) { } func testClientStreaming(t *testing.T, e env) { - s, cc := setUp(math.MaxUint32, e) + s, cc := setUp(false, math.MaxUint32, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) @@ -789,7 +839,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) { func testExceedMaxStreamsLimit(t *testing.T, e env) { // Only allows 1 live stream per server transport. - s, cc := setUp(1, e) + s, cc := setUp(false, 1, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) // Perform a unary RPC to make sure the new settings were propagated to the client.