From 4cedec40eb2ccfbe3f56bb15e894903111ada2d2 Mon Sep 17 00:00:00 2001 From: eshitachandwani <59800922+eshitachandwani@users.noreply.github.com> Date: Fri, 25 Apr 2025 13:06:52 +0530 Subject: [PATCH] grpc_test: add tests for client streaming (#8120) --- test/end2end_test.go | 155 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index 906546ef7..a42587715 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -136,6 +136,7 @@ var ( var raceMode bool // set by race.go in race mode +// Note : Do not use this for further tests. type testServer struct { testgrpc.UnimplementedTestServiceServer @@ -3584,6 +3585,160 @@ func testClientStreamingError(t *testing.T, e env) { } } +// Tests that a client receives a cardinality violation error for client-streaming +// RPCs if the server doesn't send a message before returning status OK. +func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { + // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` + // after this is fixed. + t.Skip() + ss := &stubserver.StubServer{ + StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { + // Returning status OK without sending a response message.This is a + // cardinality violation. + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + + _, err = stream.CloseAndRecv() + if err == nil { + t.Fatalf("stream.CloseAndRecv() = %v, want an error", err) + } + if status.Code(err) != codes.Internal { + t.Fatalf("stream.CloseAndRecv() = %v, want error %s", err, codes.Internal) + } +} + +// Tests that the server can continue to receive messages after calling SendAndClose. Although +// this is unexpected, we retain it for backward compatibility. +func (s) TestClientStreaming_ServerHandlerRecvAfterSendAndClose(t *testing.T) { + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + if resp, err := stream.Recv(); err != nil || !proto.Equal(resp, &testpb.StreamingInputCallRequest{}) { + t.Errorf("stream.Recv() = %s, %v, want non-nil empty response, ", resp, err) + } + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { + t.Fatalf("stream.Send(_) = %v, want ", err) + } + if resp, err := stream.CloseAndRecv(); err != nil || !proto.Equal(resp, &testpb.StreamingInputCallResponse{}) { + t.Fatalf("stream.CloseSend() = %v , %v, want non-nil empty response, ", resp, err) + } +} + +// Tests that Recv() on client streaming client blocks till the server handler +// returns even after calling SendAndClose from the server handler. +func (s) TestClientStreaming_RecvWaitsForServerHandlerRetrun(t *testing.T) { + waitForReturn := make(chan struct{}) + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + <-waitForReturn + return nil + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + // Start Recv in a goroutine to test if it blocks until the server handler returns. + errCh := make(chan error, 1) + go func() { + resp := new(testpb.StreamingInputCallResponse) + err := stream.RecvMsg(resp) + errCh <- err + }() + + // Check that Recv() is blocked. + select { + case err := <-errCh: + t.Fatalf("stream.RecvMsg(_) = %v returned unexpectedly", err) + case <-time.After(defaultTestShortTimeout): + } + + close(waitForReturn) + + // Recv() should return after the server handler returns. + select { + case err := <-errCh: + if err != nil { + t.Fatalf("stream.RecvMsg(_) = %v, want ", err) + } + case <-time.After(defaultTestTimeout): + t.Fatal("Timed out waiting for stream.RecvMsg(_) to return") + } +} + +// Tests the behavior where server handler returns an error after calling +// SendAndClose. It verifies the that client receives nil message and +// non-nil error. +func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { + wantError := status.Error(codes.Internal, "error for testing") + ss := stubserver.StubServer{ + StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { + if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { + t.Errorf("stream.SendAndClose(_) = %v, want ", err) + } + return wantError + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := ss.Client.StreamingInputCall(ctx) + if err != nil { + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + } + resp, err := stream.CloseAndRecv() + + wantStatus, _ := status.FromError(wantError) + gotStatus, _ := status.FromError(err) + + if gotStatus.Code() != wantStatus.Code() || gotStatus.Message() != wantStatus.Message() || resp != nil { + t.Fatalf("stream.CloseSend() = %v , %v, want , %s", resp, err, wantError) + } +} + func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e)