Avoid creating transport stream error outside of transport

This commit is contained in:
Menghan Li 2016-08-31 16:56:46 -07:00
parent 79b7c34917
commit 5e734ab23e
5 changed files with 24 additions and 13 deletions

View File

@ -96,7 +96,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
outBuf, err := encode(codec, args, compressor, cbuf)
if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method

View File

@ -303,10 +303,10 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
case compressionNone:
case compressionMade:
if dc == nil || recvCompress != dc.Type() {
return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}

View File

@ -547,7 +547,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
if err == io.ErrUnexpectedEOF {
err = transport.StreamError{Code: codes.Internal, Desc: "io.ErrUnexpectedEOF"}
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
switch err := err.(type) {
@ -569,8 +569,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case transport.StreamError:
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
case *rpcError:
if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
default:

View File

@ -296,7 +296,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return Errorf(codes.Internal, "grpc: %v", err)
}
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
}
@ -468,10 +468,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
err = Errorf(codes.Internal, "grpc: %v", err)
return err
}
return ss.t.Write(ss.s, out, &transport.Options{Last: false})
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
return nil
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
@ -489,5 +492,14 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize)
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
if err == io.EOF {
return err
}
if err == io.ErrUnexpectedEOF {
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
return nil
}

View File

@ -66,7 +66,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/transport"
)
var (
@ -2280,8 +2279,8 @@ func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for error")
}
if se, ok := got.(transport.StreamError); !ok || se.Code != codes.Canceled {
t.Errorf("error = %#v; want transport.StreamError with code Canceled", got)
if grpc.Code(got) != codes.Canceled {
t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
}
})
}