mirror of https://github.com/grpc/grpc-go.git
				
				
				
			Add cancel_after_begin and cancel_afer_first_response test cases
This commit is contained in:
		
							parent
							
								
									3b8eba3a28
								
							
						
					
					
						commit
						571bcddd7c
					
				| 
						 | 
				
			
			@ -45,7 +45,9 @@ import (
 | 
			
		|||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/codes"
 | 
			
		||||
	"google.golang.org/grpc/credentials"
 | 
			
		||||
	"google.golang.org/grpc/metadata"
 | 
			
		||||
	testpb "google.golang.org/grpc/interop/grpc_testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -66,7 +68,9 @@ var (
 | 
			
		|||
        server_streaming : single request with response streaming;
 | 
			
		||||
        ping_pong : full-duplex streaming;
 | 
			
		||||
        compute_engine_creds: large_unary with compute engine auth;
 | 
			
		||||
	service_account_creds: large_unary with service account auth.`)
 | 
			
		||||
	service_account_creds: large_unary with service account auth;
 | 
			
		||||
	cancel_after_begin: cancellation after metadata has been sent but before payloads are sent;
 | 
			
		||||
	cancel_after_first_response: cancellation after receiving 1st message from the server.`)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
| 
						 | 
				
			
			@ -297,6 +301,57 @@ func doServiceAccountCreds(tc testpb.TestServiceClient) {
 | 
			
		|||
	log.Println("ServiceAccountCreds done")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	testMetadata = metadata.MD{
 | 
			
		||||
		"key1": "value1",
 | 
			
		||||
		"key2": "value2",
 | 
			
		||||
	}
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func doCancelAfterBegin(tc testpb.TestServiceClient) {
 | 
			
		||||
	ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata))
 | 
			
		||||
	stream, err := tc.StreamingInputCall(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
 | 
			
		||||
	}
 | 
			
		||||
	cancel()
 | 
			
		||||
	_, err = stream.CloseAndRecv()
 | 
			
		||||
	if grpc.Code(err) != codes.Canceled {
 | 
			
		||||
		log.Fatalf("%v.CloseAndRecv() got error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
 | 
			
		||||
	}
 | 
			
		||||
	log.Println("CancelAfterBegin done")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func doCancelAfterFirstResponse(tc testpb.TestServiceClient) {
 | 
			
		||||
	ctx, cancel := context.WithCancel(context.Background())
 | 
			
		||||
	stream, err := tc.FullDuplexCall(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
 | 
			
		||||
	}
 | 
			
		||||
	respParam := []*testpb.ResponseParameters{
 | 
			
		||||
		{
 | 
			
		||||
			Size: proto.Int32(31415),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	pl := newPayload(testpb.PayloadType_COMPRESSABLE, 27182)
 | 
			
		||||
	req := &testpb.StreamingOutputCallRequest{
 | 
			
		||||
		ResponseType:       testpb.PayloadType_COMPRESSABLE.Enum(),
 | 
			
		||||
		ResponseParameters: respParam,
 | 
			
		||||
		Payload:            pl,
 | 
			
		||||
	}
 | 
			
		||||
	if err := stream.Send(req); err != nil {
 | 
			
		||||
		log.Fatalf("%v.Send(%v) = %v", stream, req, err)
 | 
			
		||||
	}
 | 
			
		||||
	if _, err := stream.Recv(); err != nil {
 | 
			
		||||
		log.Fatalf("%v.Recv() = %v", stream, err)
 | 
			
		||||
	}
 | 
			
		||||
	cancel()
 | 
			
		||||
	if _, err := stream.Recv(); grpc.Code(err) != codes.Canceled {
 | 
			
		||||
		log.Fatalf("%v compleled with error code %d, want %d", stream, grpc.Code(err), codes.Canceled)
 | 
			
		||||
	}
 | 
			
		||||
	log.Println("CancelAfterFirstResponse done")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
	serverAddr := net.JoinHostPort(*serverHost, strconv.Itoa(*serverPort))
 | 
			
		||||
| 
						 | 
				
			
			@ -354,6 +409,10 @@ func main() {
 | 
			
		|||
			log.Fatalf("TLS is not enabled. TLS is required to execute service_account_creds test case.")
 | 
			
		||||
		}
 | 
			
		||||
		doServiceAccountCreds(tc)
 | 
			
		||||
	case "cancel_after_begin":
 | 
			
		||||
		doCancelAfterBegin(tc)
 | 
			
		||||
	case "cancel_after_first_response":
 | 
			
		||||
		doCancelAfterFirstResponse(tc)
 | 
			
		||||
	default:
 | 
			
		||||
		log.Fatal("Unsupported test case: ", *testCase)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -283,9 +283,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
 | 
			
		|||
					statusDesc = err.Error()
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
 | 
			
		||||
				log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			t.WriteStatus(stream, statusCode, statusDesc)
 | 
			
		||||
		default:
 | 
			
		||||
			panic(fmt.Sprintf("payload format to be supported: %d", pf))
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -308,9 +306,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
 | 
			
		|||
			ss.statusDesc = appErr.Error()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if err := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc); err != nil {
 | 
			
		||||
		log.Printf("grpc: Server.processStreamingRPC failed to write status: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue