From a720ae6f48457cb25b5836a9a44a9bc059fa0179 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 19 Feb 2015 12:57:41 +0100 Subject: [PATCH] Make error conveyance more idiomatic. This commit applies two bulk changes to the grpc error reporting mechanisms: (1.) Error strings for errors that originate within grpc are prefixed with the package name for better clarity for where they originate since they could percolate up in the users call chains to the originator. (2.) Errors that are, in fact, singletons have been converted from fmt.Errorf to errors.New and assigned as package-level variables. This bodes particularly well for enabling API customers to elect to handle these errors upon receipt via equality comparison. This had been previous impossible with the original API. Supplementarily, ``gofmt -w -s=true`` has been run on the repository to cleanup residual defects, and it has detected and repaired a few. TEST=Manual go test ./... --- call.go | 2 +- clientconn.go | 16 ++++++++++++---- credentials/credentials.go | 1 - interop/client/client.go | 2 +- rpc_util.go | 6 +++--- server.go | 13 ++++++++++--- stream.go | 6 +++--- test/end2end_test.go | 4 ++-- transport/http2_client_transport.go | 16 ++++++++-------- transport/http2_server_transport.go | 16 ++++++++++------ transport/http_util.go | 16 ++++++++-------- transport/http_util_test.go | 6 +++--- transport/transport.go | 7 ++++++- transport/transport_test.go | 2 +- 14 files changed, 68 insertions(+), 45 deletions(-) diff --git a/call.go b/call.go index 2318bf0e9..b9da0b2ae 100644 --- a/call.go +++ b/call.go @@ -83,7 +83,7 @@ func sendRPC(ctx context.Context, callHdr *transport.CallHdr, t transport.Client // TODO(zhaoq): Support compression. outBuf, err := encode(args, compressionNone) if err != nil { - return nil, transport.StreamErrorf(codes.Internal, "%v", err) + return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) if err != nil { diff --git a/clientconn.go b/clientconn.go index c2df3e7be..eb546f942 100644 --- a/clientconn.go +++ b/clientconn.go @@ -34,7 +34,7 @@ package grpc import ( - "fmt" + "errors" "sync" "time" @@ -43,6 +43,14 @@ import ( "google.golang.org/grpc/transport" ) +var ( + // ErrUnspecTarget indicates that the target address is unspecified. + ErrUnspecTarget = errors.New("grpc: target is unspecified") + // ErrClosingChan indicates that the operation is illegal because the session + // is closing. + ErrClosingChan = errors.New("grpc: the channel is closing") +) + type dialOptions struct { protocol string authOptions []credentials.Credentials @@ -73,7 +81,7 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption { // for connection to complete. func Dial(target string, opts ...DialOption) (*ClientConn, error) { if target == "" { - return nil, fmt.Errorf("rpc.Dial: target is empty") + return nil, ErrUnspecTarget } cc := &ClientConn{ target: target, @@ -119,7 +127,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { cc.transportSeq = 0 if cc.closing { cc.mu.Unlock() - return fmt.Errorf("rpc.ClientConn.resetTransport: the channel is closing") + return ErrClosingChan } cc.mu.Unlock() if closeTransport { @@ -174,7 +182,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo switch { case cc.closing: cc.mu.Unlock() - return nil, 0, fmt.Errorf("ClientConn is closing") + return nil, 0, ErrClosingChan case ts < cc.transportSeq: // Worked on a dying transport. Try the new one immediately. defer cc.mu.Unlock() diff --git a/credentials/credentials.go b/credentials/credentials.go index 13cf7857d..7a464aa98 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -235,4 +235,3 @@ func NewServiceAccountFromFile(keyFile string, scope ...string) (Credentials, er } return NewServiceAccountFromKey(jsonKey, scope...) } - diff --git a/interop/client/client.go b/interop/client/client.go index 655084d3b..527398667 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -193,7 +193,7 @@ func doPingPong(tc testpb.TestServiceClient) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } diff --git a/rpc_util.go b/rpc_util.go index fa6e8b06f..f7b7ff4c1 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -165,10 +165,10 @@ func recvProto(p *parser, m proto.Message) error { switch pf { case compressionNone: if err := proto.Unmarshal(d, m); err != nil { - return Errorf(codes.Internal, "%v", err) + return Errorf(codes.Internal, "grpc: %v", err) } default: - return Errorf(codes.Internal, "compression is not supported yet.") + return Errorf(codes.Internal, "gprc: compression is not supported yet.") } return nil } @@ -219,7 +219,7 @@ func toRPCErr(err error) error { desc: e.Desc, } } - return Errorf(codes.Unknown, "failed to convert %v to rpcErr", err) + return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err) } // convertCode converts a standard Go error into its canonical code. Note that diff --git a/server.go b/server.go index 02b478b8d..570dc8106 100644 --- a/server.go +++ b/server.go @@ -34,6 +34,7 @@ package grpc import ( + "errors" "fmt" "io" "log" @@ -145,6 +146,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { s.m[sd.ServiceName] = srv } +var ( + // ErrServerStopped indicates that the operation is now illegal because of + // the server being stopped. + ErrServerStopped = errors.New("grpc: the server has been stopped") +) + // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC request and then call the registered handlers to reply to them. @@ -153,7 +160,7 @@ func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() if s.lis == nil { s.mu.Unlock() - return fmt.Errorf("the server has been stopped") + return ErrServerStopped } s.lis[lis] = true s.mu.Unlock() @@ -340,7 +347,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) + return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx) } t := stream.ServerTransport() if t == nil { @@ -358,7 +365,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error { } stream, ok := transport.StreamFromContext(ctx) if !ok { - return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) + return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx) } return stream.SetTrailer(md) } diff --git a/stream.go b/stream.go index 2e5cbcb20..c0266eeb7 100644 --- a/stream.go +++ b/stream.go @@ -153,7 +153,7 @@ func (cs *clientStream) SendProto(m proto.Message) (err error) { }() out, err := encode(m, compressionNone) if err != nil { - return transport.StreamErrorf(codes.Internal, "%v", err) + return transport.StreamErrorf(codes.Internal, "grpc: %v", err) } return cs.t.Write(cs.s, out, &transport.Options{Last: false}) } @@ -167,7 +167,7 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) { // Special handling for client streaming rpc. if err = recvProto(cs.p, m); err != io.EOF { cs.t.CloseStream(cs.s, err) - return fmt.Errorf("gRPC client streaming protocol violation: %v, want ", err) + return fmt.Errorf("grpc: client streaming protocol violation: %v, want ", err) } } if _, ok := err.(transport.ConnectionError); !ok { @@ -235,7 +235,7 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { func (ss *serverStream) SendProto(m proto.Message) error { out, err := encode(m, compressionNone) if err != nil { - err = transport.StreamErrorf(codes.Internal, "%v", err) + err = transport.StreamErrorf(codes.Internal, "grpc: %v", err) return err } return ss.t.Write(ss.s, out, &transport.Options{Last: false}) diff --git a/test/end2end_test.go b/test/end2end_test.go index 23b55d8a8..eb4c18866 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -388,7 +388,7 @@ func TestPingPong(t *testing.T) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } @@ -443,7 +443,7 @@ func TestMetadataStreamingRPC(t *testing.T) { var index int for index < len(reqSizes) { respParam := []*testpb.ResponseParameters{ - &testpb.ResponseParameters{ + { Size: proto.Int32(int32(respSizes[index])), }, } diff --git a/transport/http2_client_transport.go b/transport/http2_client_transport.go index 9fd6283b0..a5232b50a 100644 --- a/transport/http2_client_transport.go +++ b/transport/http2_client_transport.go @@ -117,7 +117,7 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr conn, connErr = net.Dial("tcp", addr) } if connErr != nil { - return nil, ConnectionErrorf("%v", connErr) + return nil, ConnectionErrorf("grpc/transport: %v", connErr) } defer func() { if err != nil { @@ -127,14 +127,14 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr // Send connection preface to server. n, err := conn.Write(clientPreface) if err != nil { - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } if n != len(clientPreface) { - return nil, ConnectionErrorf("Wrting client preface, wrote %d bytes; want %d", n, len(clientPreface)) + return nil, ConnectionErrorf("grpc/transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } framer := http2.NewFramer(conn, conn) if err := framer.WriteSettings(); err != nil { - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } var buf bytes.Buffer t := &http2Client{ @@ -225,7 +225,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea default: } if err != nil { - return nil, StreamErrorf(codes.InvalidArgument, "%v", err) + return nil, StreamErrorf(codes.InvalidArgument, "grpc/transport: %v", err) } for k, v := range m { t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) @@ -265,7 +265,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } if err != nil { t.notifyError() - return nil, ConnectionErrorf("%v", err) + return nil, ConnectionErrorf("grpc/transport: %v", err) } } s := t.newStream(ctx, callHdr) @@ -276,7 +276,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } if uint32(len(t.activeStreams)) >= t.maxStreams { t.mu.Unlock() - return nil, StreamErrorf(codes.Unavailable, "failed to create new stream because the limit has been reached.") + return nil, StreamErrorf(codes.Unavailable, "grpc/transport: failed to create new stream because the limit has been reached.") } t.activeStreams[s.id] = s t.mu.Unlock() @@ -391,7 +391,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // invoked. if err := t.framer.WriteData(s.id, endStream, p); err != nil { t.notifyError() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 if r.Len() == 0 { diff --git a/transport/http2_server_transport.go b/transport/http2_server_transport.go index c95597067..39cd0a629 100644 --- a/transport/http2_server_transport.go +++ b/transport/http2_server_transport.go @@ -35,7 +35,7 @@ package transport import ( "bytes" - "fmt" + "errors" "io" "log" "math" @@ -45,11 +45,15 @@ import ( "github.com/bradfitz/http2" "github.com/bradfitz/http2/hpack" + "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "golang.org/x/net/context" ) +// ErrIllegalHeaderWrite indicates that setting header is illegal because of +// the stream's state. +var ErrIllegalHeaderWrite = errors.New("grpc/transport: the stream is done or WriteHeader was already called") + // http2Server implements the ServerTransport interface with HTTP2. type http2Server struct { conn net.Conn @@ -383,7 +387,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e } if err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } } return nil @@ -394,7 +398,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { s.mu.Lock() if s.headerOk || s.state == streamDone { s.mu.Unlock() - return fmt.Errorf("transport: the stream is done or WriteHeader was already called") + return ErrIllegalHeaderWrite } s.headerOk = true s.mu.Unlock() @@ -474,7 +478,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.WriteHeaders(p); err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 } @@ -522,7 +526,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { } if err := t.framer.WriteData(s.id, false, p); err != nil { t.Close() - return ConnectionErrorf("%v", err) + return ConnectionErrorf("grpc/transport: %v", err) } t.writableChan <- 0 } diff --git a/transport/http_util.go b/transport/http_util.go index 2e2d57996..45c001124 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -138,7 +138,7 @@ func newHPACKDecoder() *hpackDecoder { case "grpc-status": code, err := strconv.Atoi(f.Value) if err != nil { - d.err = StreamErrorf(codes.Internal, "malformed grpc-status: %v", err) + d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed grpc-status: %v", err) return } d.state.statusCode = codes.Code(code) @@ -149,7 +149,7 @@ func newHPACKDecoder() *hpackDecoder { var err error d.state.timeout, err = timeoutDecode(f.Value) if err != nil { - d.err = StreamErrorf(codes.Internal, "malformed time-out: %v", err) + d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed time-out: %v", err) return } case ":path": @@ -175,12 +175,12 @@ func (d *hpackDecoder) decodeClientHTTP2Headers(s *Stream, frame headerFrame) (e d.err = nil _, err = d.h.Write(frame.HeaderBlockFragment()) if err != nil { - err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err) } if frame.HeadersEnded() { if closeErr := d.h.Close(); closeErr != nil && err == nil { - err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr) } endHeaders = true } @@ -195,12 +195,12 @@ func (d *hpackDecoder) decodeServerHTTP2Headers(s *Stream, frame headerFrame) (e d.err = nil _, err = d.h.Write(frame.HeaderBlockFragment()) if err != nil { - err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err) } if frame.HeadersEnded() { if closeErr := d.h.Close(); closeErr != nil && err == nil { - err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) + err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr) } endHeaders = true } @@ -276,12 +276,12 @@ func timeoutEncode(t time.Duration) string { func timeoutDecode(s string) (time.Duration, error) { size := len(s) if size < 2 { - return 0, fmt.Errorf("timeout string is too short: %q", s) + return 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", s) } unit := timeoutUnit(s[size-1]) d, ok := timeoutUnitToDuration(unit) if !ok { - return 0, fmt.Errorf("timeout unit is not recognized: %q", s) + return 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", s) } t, err := strconv.ParseInt(s[:size-1], 10, 64) if err != nil { diff --git a/transport/http_util_test.go b/transport/http_util_test.go index e90e41165..b4b66295c 100644 --- a/transport/http_util_test.go +++ b/transport/http_util_test.go @@ -75,9 +75,9 @@ func TestTimeoutDecode(t *testing.T) { err error }{ {"1234S", time.Second * 1234, nil}, - {"1234x", 0, fmt.Errorf("timeout unit is not recognized: %q", "1234x")}, - {"1", 0, fmt.Errorf("timeout string is too short: %q", "1")}, - {"", 0, fmt.Errorf("timeout string is too short: %q", "")}, + {"1234x", 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", "1234x")}, + {"1", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "1")}, + {"", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "")}, } { d, err := timeoutDecode(test.s) if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) { diff --git a/transport/transport.go b/transport/transport.go index cf7e9f1d0..7d7fa813e 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -39,6 +39,7 @@ package transport // import "google.golang.org/grpc/transport" import ( "bytes" + "errors" "fmt" "io" "net" @@ -244,13 +245,17 @@ func (s *Stream) StatusDesc() string { return s.statusDesc } +// ErrIllegalTrailerSet indicates that the trailer has already been set or it +// is too late to do so. +var ErrIllegalTrailerSet = errors.New("grpc/transport: trailer has been set") + // SetTrailer sets the trailer metadata which will be sent with the RPC status // by the server. This can only be called at most once. Server side only. func (s *Stream) SetTrailer(md metadata.MD) error { s.mu.Lock() defer s.mu.Unlock() if s.trailer != nil { - return fmt.Errorf("transport: Trailer has been set") + return ErrIllegalTrailerSet } s.trailer = md.Copy() return nil diff --git a/transport/transport_test.go b/transport/transport_test.go index 888b5b47b..bac711607 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -45,9 +45,9 @@ import ( "testing" "time" + "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "golang.org/x/net/context" ) type server struct {