grpc_test: add tests for client streaming (#8120)

This commit is contained in:
eshitachandwani 2025-04-25 13:06:52 +05:30 committed by GitHub
parent 030938e543
commit 4cedec40eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 155 additions and 0 deletions

View File

@ -136,6 +136,7 @@ var (
var raceMode bool // set by race.go in race mode
// Note : Do not use this for further tests.
type testServer struct {
testgrpc.UnimplementedTestServiceServer
@ -3584,6 +3585,160 @@ func testClientStreamingError(t *testing.T, e env) {
}
}
// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server doesn't send a message before returning status OK.
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
// after this is fixed.
t.Skip()
ss := &stubserver.StubServer{
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
// Returning status OK without sending a response message.This is a
// cardinality violation.
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
_, err = stream.CloseAndRecv()
if err == nil {
t.Fatalf("stream.CloseAndRecv() = %v, want an error", err)
}
if status.Code(err) != codes.Internal {
t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, codes.Internal)
}
}
// Tests that the server can continue to receive messages after calling SendAndClose. Although
// this is unexpected, we retain it for backward compatibility.
func (s) TestClientStreaming_ServerHandlerRecvAfterSendAndClose(t *testing.T) {
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
}
if resp, err := stream.Recv(); err != nil || !proto.Equal(resp, &testpb.StreamingInputCallRequest{}) {
t.Errorf("stream.Recv() = %s, %v, want non-nil empty response, <nil>", resp, err)
}
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
if resp, err := stream.CloseAndRecv(); err != nil || !proto.Equal(resp, &testpb.StreamingInputCallResponse{}) {
t.Fatalf("stream.CloseSend() = %v , %v, want non-nil empty response, <nil>", resp, err)
}
}
// Tests that Recv() on client streaming client blocks till the server handler
// returns even after calling SendAndClose from the server handler.
func (s) TestClientStreaming_RecvWaitsForServerHandlerRetrun(t *testing.T) {
waitForReturn := make(chan struct{})
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
}
<-waitForReturn
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
// Start Recv in a goroutine to test if it blocks until the server handler returns.
errCh := make(chan error, 1)
go func() {
resp := new(testpb.StreamingInputCallResponse)
err := stream.RecvMsg(resp)
errCh <- err
}()
// Check that Recv() is blocked.
select {
case err := <-errCh:
t.Fatalf("stream.RecvMsg(_) = %v returned unexpectedly", err)
case <-time.After(defaultTestShortTimeout):
}
close(waitForReturn)
// Recv() should return after the server handler returns.
select {
case err := <-errCh:
if err != nil {
t.Fatalf("stream.RecvMsg(_) = %v, want <nil>", err)
}
case <-time.After(defaultTestTimeout):
t.Fatal("Timed out waiting for stream.RecvMsg(_) to return")
}
}
// Tests the behavior where server handler returns an error after calling
// SendAndClose. It verifies the that client receives nil message and
// non-nil error.
func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
wantError := status.Error(codes.Internal, "error for testing")
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
}
return wantError
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
resp, err := stream.CloseAndRecv()
wantStatus, _ := status.FromError(wantError)
gotStatus, _ := status.FromError(err)
if gotStatus.Code() != wantStatus.Code() || gotStatus.Message() != wantStatus.Message() || resp != nil {
t.Fatalf("stream.CloseSend() = %v , %v, want <nil>, %s", resp, err, wantError)
}
}
func (s) TestExceedMaxStreamsLimit(t *testing.T) {
for _, e := range listTestEnv() {
testExceedMaxStreamsLimit(t, e)