diff --git a/call.go b/call.go index 2318bf0e9..b9da0b2ae 100644 --- a/call.go +++ b/call.go @@ -83,7 +83,7 @@ func sendRPC(ctx context.Context, callHdr *transport.CallHdr, t transport.Client // TODO(zhaoq): Support compression. outBuf, err := encode(args, compressionNone) if err != nil { - return nil, transport.StreamErrorf(codes.Internal, "%v", err) + return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) if err != nil { diff --git a/clientconn.go b/clientconn.go index c2df3e7be..eb546f942 100644 --- a/clientconn.go +++ b/clientconn.go @@ -34,7 +34,7 @@ package grpc import ( - "fmt" + "errors" "sync" "time" @@ -43,6 +43,14 @@ import ( "google.golang.org/grpc/transport" ) +var ( + // ErrUnspecTarget indicates that the target address is unspecified. + ErrUnspecTarget = errors.New("grpc: target is unspecified") + // ErrClosingChan indicates that the operation is illegal because the session + // is closing. + ErrClosingChan = errors.New("grpc: the channel is closing") +) + type dialOptions struct { protocol string authOptions []credentials.Credentials @@ -73,7 +81,7 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption { // for connection to complete. func Dial(target string, opts ...DialOption) (*ClientConn, error) { if target == "" { - return nil, fmt.Errorf("rpc.Dial: target is empty") + return nil, ErrUnspecTarget } cc := &ClientConn{ target: target, @@ -119,7 +127,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { cc.transportSeq = 0 if cc.closing { cc.mu.Unlock() - return fmt.Errorf("rpc.ClientConn.resetTransport: the channel is closing") + return ErrClosingChan } cc.mu.Unlock() if closeTransport { @@ -174,7 +182,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo switch { case cc.closing: cc.mu.Unlock() - return nil, 0, fmt.Errorf("ClientConn is closing") + return nil, 0, ErrClosingChan case ts < cc.transportSeq: // Worked on a dying transport. Try the new one immediately. defer cc.mu.Unlock() diff --git a/credentials/credentials.go b/credentials/credentials.go index 13cf7857d..7a464aa98 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -235,4 +235,3 @@ func NewServiceAccountFromFile(keyFile string, scope ...string) (Credentials, er } return NewServiceAccountFromKey(jsonKey, scope...) } - diff --git a/interop/client/client.go b/interop/client/client.go index 655084d3b..527398667 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -193,7 +193,7 @@ func doPingPong(tc testpb.TestServiceClient) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } diff --git a/rpc_util.go b/rpc_util.go index fa6e8b06f..f7b7ff4c1 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -165,10 +165,10 @@ func recvProto(p *parser, m proto.Message) error { switch pf { case compressionNone: if err := proto.Unmarshal(d, m); err != nil { - return Errorf(codes.Internal, "%v", err) + return Errorf(codes.Internal, "grpc: %v", err) } default: - return Errorf(codes.Internal, "compression is not supported yet.") + return Errorf(codes.Internal, "gprc: compression is not supported yet.") } return nil } @@ -219,7 +219,7 @@ func toRPCErr(err error) error { desc: e.Desc, } } - return Errorf(codes.Unknown, "failed to convert %v to rpcErr", err) + return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err) } // convertCode converts a standard Go error into its canonical code. Note that diff --git a/server.go b/server.go index 02b478b8d..570dc8106 100644 --- a/server.go +++ b/server.go @@ -34,6 +34,7 @@ package grpc import ( + "errors" "fmt" "io" "log" @@ -145,6 +146,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { s.m[sd.ServiceName] = srv } +var ( + // ErrServerStopped indicates that the operation is now illegal because of + // the server being stopped. + ErrServerStopped = errors.New("grpc: the server has been stopped") +) + // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC request and then call the registered handlers to reply to them. @@ -153,7 +160,7 @@ func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() if s.lis == nil { s.mu.Unlock() - return fmt.Errorf("the server has been stopped") + return ErrServerStopped } s.lis[lis] = true s.mu.Unlock() @@ -340,7 +347,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) + return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx) } t := stream.ServerTransport() if t == nil { @@ -358,7 +365,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) + return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetTrailer(md) } diff --git a/stream.go b/stream.go index 2e5cbcb20..c0266eeb7 100644 --- a/stream.go +++ b/stream.go @@ -153,7 +153,7 @@ func (cs *clientStream) SendProto(m proto.Message) (err error) { }() out, err := encode(m, compressionNone) if err != nil { - return transport.StreamErrorf(codes.Internal, "%v", err) + return transport.StreamErrorf(codes.Internal, "grpc: %v", err) } return cs.t.Write(cs.s, out, &transport.Options{Last: false}) } @@ -167,7 +167,7 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) { // Special handling for client streaming rpc. if err = recvProto(cs.p, m); err != io.EOF { cs.t.CloseStream(cs.s, err) - return fmt.Errorf("gRPC client streaming protocol violation: %v, want ", err) + return fmt.Errorf("grpc: client streaming protocol violation: %v, want ", err) } } if _, ok := err.(transport.ConnectionError); !ok { @@ -235,7 +235,7 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { func (ss *serverStream) SendProto(m proto.Message) error { out, err := encode(m, compressionNone) if err != nil { - err = transport.StreamErrorf(codes.Internal, "%v", err) + err = transport.StreamErrorf(codes.Internal, "grpc: %v", err) return err } return ss.t.Write(ss.s, out, &transport.Options{Last: false}) diff --git a/test/end2end_test.go b/test/end2end_test.go index 23b55d8a8..eb4c18866 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -388,7 +388,7 @@ func TestPingPong(t *testing.T) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } @@ -443,7 +443,7 @@ func TestMetadataStreamingRPC(t *testing.T) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } diff --git a/transport/http2_client_transport.go b/transport/http2_client_transport.go index 9fd6283b0..a5232b50a 100644 --- a/transport/http2_client_transport.go +++ b/transport/http2_client_transport.go @@ -117,7 +117,7 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr conn, connErr = net.Dial("tcp", addr) } if connErr != nil { - return nil, ConnectionErrorf("%v", connErr) + return nil, ConnectionErrorf("grpc/transport: %v", connErr) } defer func() { if err != nil { @@ -127,14 +127,14 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr // Send connection preface to server. n, err := conn.Write(clientPreface) if err != nil { - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } if n != len(clientPreface) { - return nil, ConnectionErrorf("Wrting client preface, wrote %d bytes; want %d", n, len(clientPreface)) + return nil, ConnectionErrorf("grpc/transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } framer := http2.NewFramer(conn, conn) if err := framer.WriteSettings(); err != nil { - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } var buf bytes.Buffer t := &http2Client{ @@ -225,7 +225,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea default: } if err != nil { - return nil, StreamErrorf(codes.InvalidArgument, "%v", err) + return nil, StreamErrorf(codes.InvalidArgument, "grpc/transport: %v", err) } for k, v := range m { t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) @@ -265,7 +265,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } if err != nil { t.notifyError() - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } } s := t.newStream(ctx, callHdr) @@ -276,7 +276,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } if uint32(len(t.activeStreams)) >= t.maxStreams { t.mu.Unlock() - return nil, StreamErrorf(codes.Unavailable, "failed to create new stream because the limit has been reached.") + return nil, StreamErrorf(codes.Unavailable, "grpc/transport: failed to create new stream because the limit has been reached.") } t.activeStreams[s.id] = s t.mu.Unlock() @@ -391,7 +391,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // invoked. if err := t.framer.WriteData(s.id, endStream, p); err != nil { t.notifyError() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 if r.Len() == 0 { diff --git a/transport/http2_server_transport.go b/transport/http2_server_transport.go index c95597067..39cd0a629 100644 --- a/transport/http2_server_transport.go +++ b/transport/http2_server_transport.go @@ -35,7 +35,7 @@ package transport import ( "bytes" - "fmt" + "errors" "io" "log" "math" @@ -45,11 +45,15 @@ import ( "github.com/bradfitz/http2" "github.com/bradfitz/http2/hpack" + "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "golang.org/x/net/context" ) +// ErrIllegalHeaderWrite indicates that setting header is illegal because of +// the stream's state. +var ErrIllegalHeaderWrite = errors.New("grpc/transport: the stream is done or WriteHeader was already called") + // http2Server implements the ServerTransport interface with HTTP2. type http2Server struct { conn net.Conn @@ -383,7 +387,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e } if err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } } return nil @@ -394,7 +398,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { s.mu.Lock() if s.headerOk || s.state == streamDone { s.mu.Unlock() - return fmt.Errorf("transport: the stream is done or WriteHeader was already called") + return ErrIllegalHeaderWrite } s.headerOk = true s.mu.Unlock() @@ -474,7 +478,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.WriteHeaders(p); err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 } @@ -522,7 +526,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.WriteData(s.id, false, p); err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 } diff --git a/transport/http_util.go b/transport/http_util.go index 2e2d57996..45c001124 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -138,7 +138,7 @@ func newHPACKDecoder() *hpackDecoder { case "grpc-status": code, err := strconv.Atoi(f.Value) if err != nil { - d.err = StreamErrorf(codes.Internal, "malformed grpc-status: %v", err) + d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed grpc-status: %v", err) return } d.state.statusCode = codes.Code(code) @@ -149,7 +149,7 @@ func newHPACKDecoder() *hpackDecoder { var err error d.state.timeout, err = timeoutDecode(f.Value) if err != nil { - d.err = StreamErrorf(codes.Internal, "malformed time-out: %v", err) + d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed time-out: %v", err) return } case ":path": @@ -175,12 +175,12 @@ func (d *hpackDecoder) decodeClientHTTP2Headers(s *Stream, frame headerFrame) (e d.err = nil _, err = d.h.Write(frame.HeaderBlockFragment()) if err != nil { - err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err) } if frame.HeadersEnded() { if closeErr := d.h.Close(); closeErr != nil && err == nil { - err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr) } endHeaders = true } @@ -195,12 +195,12 @@ func (d *hpackDecoder) decodeServerHTTP2Headers(s *Stream, frame headerFrame) (e d.err = nil _, err = d.h.Write(frame.HeaderBlockFragment()) if err != nil { - err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err) } if frame.HeadersEnded() { if closeErr := d.h.Close(); closeErr != nil && err == nil { - err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr) } endHeaders = true } @@ -276,12 +276,12 @@ func timeoutEncode(t time.Duration) string { func timeoutDecode(s string) (time.Duration, error) { size := len(s) if size < 2 { - return 0, fmt.Errorf("timeout string is too short: %q", s) + return 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", s) } unit := timeoutUnit(s[size-1]) d, ok := timeoutUnitToDuration(unit) if !ok { - return 0, fmt.Errorf("timeout unit is not recognized: %q", s) + return 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", s) } t, err := strconv.ParseInt(s[:size-1], 10, 64) if err != nil { diff --git a/transport/http_util_test.go b/transport/http_util_test.go index e90e41165..b4b66295c 100644 --- a/transport/http_util_test.go +++ b/transport/http_util_test.go @@ -75,9 +75,9 @@ func TestTimeoutDecode(t *testing.T) { err error }{ {"1234S", time.Second * 1234, nil}, - {"1234x", 0, fmt.Errorf("timeout unit is not recognized: %q", "1234x")}, - {"1", 0, fmt.Errorf("timeout string is too short: %q", "1")}, - {"", 0, fmt.Errorf("timeout string is too short: %q", "")}, + {"1234x", 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", "1234x")}, + {"1", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "1")}, + {"", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "")}, } { d, err := timeoutDecode(test.s) if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) { diff --git a/transport/transport.go b/transport/transport.go index cf7e9f1d0..7d7fa813e 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -39,6 +39,7 @@ package transport // import "google.golang.org/grpc/transport" import ( "bytes" + "errors" "fmt" "io" "net" @@ -244,13 +245,17 @@ func (s *Stream) StatusDesc() string { return s.statusDesc } +// ErrIllegalTrailerSet indicates that the trailer has already been set or it +// is too late to do so. +var ErrIllegalTrailerSet = errors.New("grpc/transport: trailer has been set") + // SetTrailer sets the trailer metadata which will be sent with the RPC status // by the server. This can only be called at most once. Server side only. func (s *Stream) SetTrailer(md metadata.MD) error { s.mu.Lock() defer s.mu.Unlock() if s.trailer != nil { - return fmt.Errorf("transport: Trailer has been set") + return ErrIllegalTrailerSet } s.trailer = md.Copy() return nil diff --git a/transport/transport_test.go b/transport/transport_test.go index 888b5b47b..bac711607 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -45,9 +45,9 @@ import ( "testing" "time" + "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "golang.org/x/net/context" ) type server struct {