Merge pull request #44 from matttproud/refactor/idiomatic-errors

Make error conveyance more idiomatic.
This commit is contained in:
Qi Zhao 2015-02-19 11:01:45 -08:00
commit 0639042e75
14 changed files with 68 additions and 45 deletions

View File

@ -83,7 +83,7 @@ func sendRPC(ctx context.Context, callHdr *transport.CallHdr, t transport.Client
// TODO(zhaoq): Support compression. // TODO(zhaoq): Support compression.
outBuf, err := encode(args, compressionNone) outBuf, err := encode(args, compressionNone)
if err != nil { if err != nil {
return nil, transport.StreamErrorf(codes.Internal, "%v", err) return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
} }
err = t.Write(stream, outBuf, opts) err = t.Write(stream, outBuf, opts)
if err != nil { if err != nil {

View File

@ -34,7 +34,7 @@
package grpc package grpc
import ( import (
"fmt" "errors"
"sync" "sync"
"time" "time"
@ -43,6 +43,14 @@ import (
"google.golang.org/grpc/transport" "google.golang.org/grpc/transport"
) )
var (
// ErrUnspecTarget indicates that the target address is unspecified.
ErrUnspecTarget = errors.New("grpc: target is unspecified")
// ErrClosingChan indicates that the operation is illegal because the session
// is closing.
ErrClosingChan = errors.New("grpc: the channel is closing")
)
type dialOptions struct { type dialOptions struct {
protocol string protocol string
authOptions []credentials.Credentials authOptions []credentials.Credentials
@ -73,7 +81,7 @@ func WithPerRPCCredentials(creds credentials.Credentials) DialOption {
// for connection to complete. // for connection to complete.
func Dial(target string, opts ...DialOption) (*ClientConn, error) { func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if target == "" { if target == "" {
return nil, fmt.Errorf("rpc.Dial: target is empty") return nil, ErrUnspecTarget
} }
cc := &ClientConn{ cc := &ClientConn{
target: target, target: target,
@ -119,7 +127,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
cc.transportSeq = 0 cc.transportSeq = 0
if cc.closing { if cc.closing {
cc.mu.Unlock() cc.mu.Unlock()
return fmt.Errorf("rpc.ClientConn.resetTransport: the channel is closing") return ErrClosingChan
} }
cc.mu.Unlock() cc.mu.Unlock()
if closeTransport { if closeTransport {
@ -174,7 +182,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
switch { switch {
case cc.closing: case cc.closing:
cc.mu.Unlock() cc.mu.Unlock()
return nil, 0, fmt.Errorf("ClientConn is closing") return nil, 0, ErrClosingChan
case ts < cc.transportSeq: case ts < cc.transportSeq:
// Worked on a dying transport. Try the new one immediately. // Worked on a dying transport. Try the new one immediately.
defer cc.mu.Unlock() defer cc.mu.Unlock()

View File

@ -235,4 +235,3 @@ func NewServiceAccountFromFile(keyFile string, scope ...string) (Credentials, er
} }
return NewServiceAccountFromKey(jsonKey, scope...) return NewServiceAccountFromKey(jsonKey, scope...)
} }

View File

@ -193,7 +193,7 @@ func doPingPong(tc testpb.TestServiceClient) {
var index int var index int
for index < len(reqSizes) { for index < len(reqSizes) {
respParam := []*testpb.ResponseParameters{ respParam := []*testpb.ResponseParameters{
&testpb.ResponseParameters{ {
Size: proto.Int32(int32(respSizes[index])), Size: proto.Int32(int32(respSizes[index])),
}, },
} }

View File

@ -165,10 +165,10 @@ func recvProto(p *parser, m proto.Message) error {
switch pf { switch pf {
case compressionNone: case compressionNone:
if err := proto.Unmarshal(d, m); err != nil { if err := proto.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "%v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
default: default:
return Errorf(codes.Internal, "compression is not supported yet.") return Errorf(codes.Internal, "gprc: compression is not supported yet.")
} }
return nil return nil
} }
@ -219,7 +219,7 @@ func toRPCErr(err error) error {
desc: e.Desc, desc: e.Desc,
} }
} }
return Errorf(codes.Unknown, "failed to convert %v to rpcErr", err) return Errorf(codes.Unknown, "grpc: failed to convert %v to rpcErr", err)
} }
// convertCode converts a standard Go error into its canonical code. Note that // convertCode converts a standard Go error into its canonical code. Note that

View File

@ -34,6 +34,7 @@
package grpc package grpc
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -145,6 +146,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
s.m[sd.ServiceName] = srv s.m[sd.ServiceName] = srv
} }
var (
// ErrServerStopped indicates that the operation is now illegal because of
// the server being stopped.
ErrServerStopped = errors.New("grpc: the server has been stopped")
)
// Serve accepts incoming connections on the listener lis, creating a new // Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines // ServerTransport and service goroutine for each. The service goroutines
// read gRPC request and then call the registered handlers to reply to them. // read gRPC request and then call the registered handlers to reply to them.
@ -153,7 +160,7 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock() s.mu.Lock()
if s.lis == nil { if s.lis == nil {
s.mu.Unlock() s.mu.Unlock()
return fmt.Errorf("the server has been stopped") return ErrServerStopped
} }
s.lis[lis] = true s.lis[lis] = true
s.mu.Unlock() s.mu.Unlock()
@ -340,7 +347,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
} }
stream, ok := transport.StreamFromContext(ctx) stream, ok := transport.StreamFromContext(ctx)
if !ok { if !ok {
return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
} }
t := stream.ServerTransport() t := stream.ServerTransport()
if t == nil { if t == nil {
@ -358,7 +365,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
} }
stream, ok := transport.StreamFromContext(ctx) stream, ok := transport.StreamFromContext(ctx)
if !ok { if !ok {
return fmt.Errorf("rpc: failed to fetch the stream from the context %v", ctx) return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
} }
return stream.SetTrailer(md) return stream.SetTrailer(md)
} }

View File

@ -153,7 +153,7 @@ func (cs *clientStream) SendProto(m proto.Message) (err error) {
}() }()
out, err := encode(m, compressionNone) out, err := encode(m, compressionNone)
if err != nil { if err != nil {
return transport.StreamErrorf(codes.Internal, "%v", err) return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
} }
return cs.t.Write(cs.s, out, &transport.Options{Last: false}) return cs.t.Write(cs.s, out, &transport.Options{Last: false})
} }
@ -167,7 +167,7 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) {
// Special handling for client streaming rpc. // Special handling for client streaming rpc.
if err = recvProto(cs.p, m); err != io.EOF { if err = recvProto(cs.p, m); err != io.EOF {
cs.t.CloseStream(cs.s, err) cs.t.CloseStream(cs.s, err)
return fmt.Errorf("gRPC client streaming protocol violation: %v, want <EOF>", err) return fmt.Errorf("grpc: client streaming protocol violation: %v, want <EOF>", err)
} }
} }
if _, ok := err.(transport.ConnectionError); !ok { if _, ok := err.(transport.ConnectionError); !ok {
@ -235,7 +235,7 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
func (ss *serverStream) SendProto(m proto.Message) error { func (ss *serverStream) SendProto(m proto.Message) error {
out, err := encode(m, compressionNone) out, err := encode(m, compressionNone)
if err != nil { if err != nil {
err = transport.StreamErrorf(codes.Internal, "%v", err) err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
return err return err
} }
return ss.t.Write(ss.s, out, &transport.Options{Last: false}) return ss.t.Write(ss.s, out, &transport.Options{Last: false})

View File

@ -388,7 +388,7 @@ func TestPingPong(t *testing.T) {
var index int var index int
for index < len(reqSizes) { for index < len(reqSizes) {
respParam := []*testpb.ResponseParameters{ respParam := []*testpb.ResponseParameters{
&testpb.ResponseParameters{ {
Size: proto.Int32(int32(respSizes[index])), Size: proto.Int32(int32(respSizes[index])),
}, },
} }
@ -443,7 +443,7 @@ func TestMetadataStreamingRPC(t *testing.T) {
var index int var index int
for index < len(reqSizes) { for index < len(reqSizes) {
respParam := []*testpb.ResponseParameters{ respParam := []*testpb.ResponseParameters{
&testpb.ResponseParameters{ {
Size: proto.Int32(int32(respSizes[index])), Size: proto.Int32(int32(respSizes[index])),
}, },
} }

View File

@ -117,7 +117,7 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr
conn, connErr = net.Dial("tcp", addr) conn, connErr = net.Dial("tcp", addr)
} }
if connErr != nil { if connErr != nil {
return nil, ConnectionErrorf("%v", connErr) return nil, ConnectionErrorf("grpc/transport: %v", connErr)
} }
defer func() { defer func() {
if err != nil { if err != nil {
@ -127,14 +127,14 @@ func newHTTP2Client(addr string, authOpts []credentials.Credentials) (_ ClientTr
// Send connection preface to server. // Send connection preface to server.
n, err := conn.Write(clientPreface) n, err := conn.Write(clientPreface)
if err != nil { if err != nil {
return nil, ConnectionErrorf("%v", err) return nil, ConnectionErrorf("grpc/transport: %v", err)
} }
if n != len(clientPreface) { if n != len(clientPreface) {
return nil, ConnectionErrorf("Wrting client preface, wrote %d bytes; want %d", n, len(clientPreface)) return nil, ConnectionErrorf("grpc/transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
} }
framer := http2.NewFramer(conn, conn) framer := http2.NewFramer(conn, conn)
if err := framer.WriteSettings(); err != nil { if err := framer.WriteSettings(); err != nil {
return nil, ConnectionErrorf("%v", err) return nil, ConnectionErrorf("grpc/transport: %v", err)
} }
var buf bytes.Buffer var buf bytes.Buffer
t := &http2Client{ t := &http2Client{
@ -225,7 +225,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
default: default:
} }
if err != nil { if err != nil {
return nil, StreamErrorf(codes.InvalidArgument, "%v", err) return nil, StreamErrorf(codes.InvalidArgument, "grpc/transport: %v", err)
} }
for k, v := range m { for k, v := range m {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v})
@ -265,7 +265,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
} }
if err != nil { if err != nil {
t.notifyError() t.notifyError()
return nil, ConnectionErrorf("%v", err) return nil, ConnectionErrorf("grpc/transport: %v", err)
} }
} }
s := t.newStream(ctx, callHdr) s := t.newStream(ctx, callHdr)
@ -276,7 +276,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
} }
if uint32(len(t.activeStreams)) >= t.maxStreams { if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock() t.mu.Unlock()
return nil, StreamErrorf(codes.Unavailable, "failed to create new stream because the limit has been reached.") return nil, StreamErrorf(codes.Unavailable, "grpc/transport: failed to create new stream because the limit has been reached.")
} }
t.activeStreams[s.id] = s t.activeStreams[s.id] = s
t.mu.Unlock() t.mu.Unlock()
@ -391,7 +391,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// invoked. // invoked.
if err := t.framer.WriteData(s.id, endStream, p); err != nil { if err := t.framer.WriteData(s.id, endStream, p); err != nil {
t.notifyError() t.notifyError()
return ConnectionErrorf("%v", err) return ConnectionErrorf("grpc/transport: %v", err)
} }
t.writableChan <- 0 t.writableChan <- 0
if r.Len() == 0 { if r.Len() == 0 {

View File

@ -35,7 +35,7 @@ package transport
import ( import (
"bytes" "bytes"
"fmt" "errors"
"io" "io"
"log" "log"
"math" "math"
@ -45,11 +45,15 @@ import (
"github.com/bradfitz/http2" "github.com/bradfitz/http2"
"github.com/bradfitz/http2/hpack" "github.com/bradfitz/http2/hpack"
"golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"golang.org/x/net/context"
) )
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
// the stream's state.
var ErrIllegalHeaderWrite = errors.New("grpc/transport: the stream is done or WriteHeader was already called")
// http2Server implements the ServerTransport interface with HTTP2. // http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct { type http2Server struct {
conn net.Conn conn net.Conn
@ -383,7 +387,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
} }
if err != nil { if err != nil {
t.Close() t.Close()
return ConnectionErrorf("%v", err) return ConnectionErrorf("grpc/transport: %v", err)
} }
} }
return nil return nil
@ -394,7 +398,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
s.mu.Lock() s.mu.Lock()
if s.headerOk || s.state == streamDone { if s.headerOk || s.state == streamDone {
s.mu.Unlock() s.mu.Unlock()
return fmt.Errorf("transport: the stream is done or WriteHeader was already called") return ErrIllegalHeaderWrite
} }
s.headerOk = true s.headerOk = true
s.mu.Unlock() s.mu.Unlock()
@ -474,7 +478,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
} }
if err := t.framer.WriteHeaders(p); err != nil { if err := t.framer.WriteHeaders(p); err != nil {
t.Close() t.Close()
return ConnectionErrorf("%v", err) return ConnectionErrorf("grpc/transport: %v", err)
} }
t.writableChan <- 0 t.writableChan <- 0
} }
@ -522,7 +526,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
} }
if err := t.framer.WriteData(s.id, false, p); err != nil { if err := t.framer.WriteData(s.id, false, p); err != nil {
t.Close() t.Close()
return ConnectionErrorf("%v", err) return ConnectionErrorf("grpc/transport: %v", err)
} }
t.writableChan <- 0 t.writableChan <- 0
} }

View File

@ -138,7 +138,7 @@ func newHPACKDecoder() *hpackDecoder {
case "grpc-status": case "grpc-status":
code, err := strconv.Atoi(f.Value) code, err := strconv.Atoi(f.Value)
if err != nil { if err != nil {
d.err = StreamErrorf(codes.Internal, "malformed grpc-status: %v", err) d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed grpc-status: %v", err)
return return
} }
d.state.statusCode = codes.Code(code) d.state.statusCode = codes.Code(code)
@ -149,7 +149,7 @@ func newHPACKDecoder() *hpackDecoder {
var err error var err error
d.state.timeout, err = timeoutDecode(f.Value) d.state.timeout, err = timeoutDecode(f.Value)
if err != nil { if err != nil {
d.err = StreamErrorf(codes.Internal, "malformed time-out: %v", err) d.err = StreamErrorf(codes.Internal, "grpc/transport: malformed time-out: %v", err)
return return
} }
case ":path": case ":path":
@ -175,12 +175,12 @@ func (d *hpackDecoder) decodeClientHTTP2Headers(s *Stream, frame headerFrame) (e
d.err = nil d.err = nil
_, err = d.h.Write(frame.HeaderBlockFragment()) _, err = d.h.Write(frame.HeaderBlockFragment())
if err != nil { if err != nil {
err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err)
} }
if frame.HeadersEnded() { if frame.HeadersEnded() {
if closeErr := d.h.Close(); closeErr != nil && err == nil { if closeErr := d.h.Close(); closeErr != nil && err == nil {
err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr)
} }
endHeaders = true endHeaders = true
} }
@ -195,12 +195,12 @@ func (d *hpackDecoder) decodeServerHTTP2Headers(s *Stream, frame headerFrame) (e
d.err = nil d.err = nil
_, err = d.h.Write(frame.HeaderBlockFragment()) _, err = d.h.Write(frame.HeaderBlockFragment())
if err != nil { if err != nil {
err = StreamErrorf(codes.Internal, "HPACK header decode error: %v", err) err = StreamErrorf(codes.Internal, "grpc/transport: HPACK header decode error: %v", err)
} }
if frame.HeadersEnded() { if frame.HeadersEnded() {
if closeErr := d.h.Close(); closeErr != nil && err == nil { if closeErr := d.h.Close(); closeErr != nil && err == nil {
err = StreamErrorf(codes.Internal, "HPACK decoder close error: %v", closeErr) err = StreamErrorf(codes.Internal, "grpc/transport: HPACK decoder close error: %v", closeErr)
} }
endHeaders = true endHeaders = true
} }
@ -276,12 +276,12 @@ func timeoutEncode(t time.Duration) string {
func timeoutDecode(s string) (time.Duration, error) { func timeoutDecode(s string) (time.Duration, error) {
size := len(s) size := len(s)
if size < 2 { if size < 2 {
return 0, fmt.Errorf("timeout string is too short: %q", s) return 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", s)
} }
unit := timeoutUnit(s[size-1]) unit := timeoutUnit(s[size-1])
d, ok := timeoutUnitToDuration(unit) d, ok := timeoutUnitToDuration(unit)
if !ok { if !ok {
return 0, fmt.Errorf("timeout unit is not recognized: %q", s) return 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", s)
} }
t, err := strconv.ParseInt(s[:size-1], 10, 64) t, err := strconv.ParseInt(s[:size-1], 10, 64)
if err != nil { if err != nil {

View File

@ -75,9 +75,9 @@ func TestTimeoutDecode(t *testing.T) {
err error err error
}{ }{
{"1234S", time.Second * 1234, nil}, {"1234S", time.Second * 1234, nil},
{"1234x", 0, fmt.Errorf("timeout unit is not recognized: %q", "1234x")}, {"1234x", 0, fmt.Errorf("grpc/transport: timeout unit is not recognized: %q", "1234x")},
{"1", 0, fmt.Errorf("timeout string is too short: %q", "1")}, {"1", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "1")},
{"", 0, fmt.Errorf("timeout string is too short: %q", "")}, {"", 0, fmt.Errorf("grpc/transport: timeout string is too short: %q", "")},
} { } {
d, err := timeoutDecode(test.s) d, err := timeoutDecode(test.s)
if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) { if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) {

View File

@ -39,6 +39,7 @@ package transport // import "google.golang.org/grpc/transport"
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -244,13 +245,17 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc return s.statusDesc
} }
// ErrIllegalTrailerSet indicates that the trailer has already been set or it
// is too late to do so.
var ErrIllegalTrailerSet = errors.New("grpc/transport: trailer has been set")
// SetTrailer sets the trailer metadata which will be sent with the RPC status // SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can only be called at most once. Server side only. // by the server. This can only be called at most once. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error { func (s *Stream) SetTrailer(md metadata.MD) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.trailer != nil { if s.trailer != nil {
return fmt.Errorf("transport: Trailer has been set") return ErrIllegalTrailerSet
} }
s.trailer = md.Copy() s.trailer = md.Copy()
return nil return nil

View File

@ -45,9 +45,9 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"golang.org/x/net/context"
) )
type server struct { type server struct {