grpc: Fix cardinality violations in client streaming and unary RPCs (#8330)

This commit is contained in:
Pranjali-2501 2025-05-28 16:06:43 +05:30 committed by GitHub
parent fb223f78b8
commit ec4810caeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 2 deletions

View File

@ -1171,7 +1171,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (a *csAttempt) finish(err error) {
@ -1495,7 +1495,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
} else if err != nil {
return toRPCErr(err)
}
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
}
func (as *addrConnStream) finish(err error) {

View File

@ -3740,6 +3740,39 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
}
}
// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server call SendMsg multiple times.
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
ss := stubserver.StubServer{
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
if err := stream.SendMsg(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendMsg(_) = %v, want <nil>", err)
}
if err := stream.SendMsg(&testpb.StreamingInputCallResponse{}); err != nil {
t.Errorf("stream.SendMsg(_) = %v, want <nil>", err)
}
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := ss.Client.StreamingInputCall(ctx)
if err != nil {
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
}
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
}
if _, err := stream.CloseAndRecv(); status.Code(err) != codes.Internal {
t.Fatalf("stream.CloseAndRecv() = %v, want error with status code %s", err, codes.Internal)
}
}
func (s) TestExceedMaxStreamsLimit(t *testing.T) {
for _, e := range listTestEnv() {
testExceedMaxStreamsLimit(t, e)