From 571bcddd7c4f969a870b0658f97562efd742aa49 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 22 Apr 2015 15:04:17 -0700 Subject: [PATCH] Add cancel_after_begin and cancel_afer_first_response test cases --- interop/client/client.go | 61 +++++++++++++++++++++++++++++++++++++++- server.go | 8 ++---- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/interop/client/client.go b/interop/client/client.go index b0ad2db65..a537c6967 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -45,7 +45,9 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" testpb "google.golang.org/grpc/interop/grpc_testing" ) @@ -66,7 +68,9 @@ var ( server_streaming : single request with response streaming; ping_pong : full-duplex streaming; compute_engine_creds: large_unary with compute engine auth; - service_account_creds: large_unary with service account auth.`) + service_account_creds: large_unary with service account auth; + cancel_after_begin: cancellation after metadata has been sent but before payloads are sent; + cancel_after_first_response: cancellation after receiving 1st message from the server.`) ) var ( @@ -297,6 +301,57 @@ func doServiceAccountCreds(tc testpb.TestServiceClient) { log.Println("ServiceAccountCreds done") } +var ( + testMetadata = metadata.MD{ + "key1": "value1", + "key2": "value2", + } +) + +func doCancelAfterBegin(tc testpb.TestServiceClient) { + ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata)) + stream, err := tc.StreamingInputCall(ctx) + if err != nil { + log.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err) + } + cancel() + _, err = stream.CloseAndRecv() + if grpc.Code(err) != codes.Canceled { + log.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, grpc.Code(err), codes.Canceled) + } + log.Println("CancelAfterBegin done") +} + +func doCancelAfterFirstResponse(tc testpb.TestServiceClient) { + ctx, cancel := context.WithCancel(context.Background()) + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + log.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) + } + respParam := []*testpb.ResponseParameters{ + { + Size: proto.Int32(31415), + }, + } + pl := newPayload(testpb.PayloadType_COMPRESSABLE, 27182) + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParam, + Payload: pl, + } + if err := stream.Send(req); err != nil { + log.Fatalf("%v.Send(%v) = %v", stream, req, err) + } + if _, err := stream.Recv(); err != nil { + log.Fatalf("%v.Recv() = %v", stream, err) + } + cancel() + if _, err := stream.Recv(); grpc.Code(err) != codes.Canceled { + log.Fatalf("%v compleled with error code %d, want %d", stream, grpc.Code(err), codes.Canceled) + } + log.Println("CancelAfterFirstResponse done") +} + func main() { flag.Parse() serverAddr := net.JoinHostPort(*serverHost, strconv.Itoa(*serverPort)) @@ -354,6 +409,10 @@ func main() { log.Fatalf("TLS is not enabled. TLS is required to execute service_account_creds test case.") } doServiceAccountCreds(tc) + case "cancel_after_begin": + doCancelAfterBegin(tc) + case "cancel_after_first_response": + doCancelAfterFirstResponse(tc) default: log.Fatal("Unsupported test case: ", *testCase) } diff --git a/server.go b/server.go index bf0ad3b34..716ab6add 100644 --- a/server.go +++ b/server.go @@ -283,9 +283,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusDesc = err.Error() } } - if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { - log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) - } + t.WriteStatus(stream, statusCode, statusDesc) default: panic(fmt.Sprintf("payload format to be supported: %d", pf)) } @@ -308,9 +306,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.statusDesc = appErr.Error() } } - if err := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc); err != nil { - log.Printf("grpc: Server.processStreamingRPC failed to write status: %v", err) - } + t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {