bug fix, typo fix and slight error refactoring

This commit is contained in:
iamqizhao 2016-06-28 16:08:19 -07:00
parent 01ef81a4d9
commit 213a20c4fe
6 changed files with 68 additions and 37 deletions

View File

@ -40,7 +40,6 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/transport"
)
// Address represents a server the client connects to.
@ -321,7 +320,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
for {
select {
case <-ctx.Done():
err = transport.ContextErr(ctx.Err())
err = ctx.Err()
return
case <-ch:
rr.mu.Lock()

12
call.go
View File

@ -155,19 +155,19 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(rpcError); ok {
return err
}
if err == ErrClientConnClosing {
return Errorf(codes.FailedPrecondition, "%v", err)
}
if _, ok := err.(transport.StreamError); ok {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); ok {
if err == errConnClosing {
if c.failFast {
return toRPCErr(err)
return Errorf(codes.Unavailable, "%v", errConnClosing)
}
continue
}
// ALl the other errors are treated as Internal errors.
// All the other errors are treated as Internal errors.
return Errorf(codes.Internal, "%v", err)
}
if c.traceInfo.tr != nil {

View File

@ -426,7 +426,7 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
addr, put, err := cc.balancer.Get(ctx, opts)
if err != nil {
return nil, nil, err
return nil, nil, toRPCErr(err)
}
cc.mu.RLock()
if cc.conns == nil {
@ -439,7 +439,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
if put != nil {
put()
}
return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
}
t, err := ac.wait(ctx, !opts.BlockingWait)
if err != nil {
@ -661,11 +661,10 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
ct := ac.transport
ac.mu.Unlock()
return ct, nil
default:
if ac.state == TransientFailure && failFast {
case ac.state == TransientFailure && failFast:
ac.mu.Unlock()
return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
}
return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
default:
ready := ac.ready
if ready == nil {
ready = make(chan struct{})
@ -674,16 +673,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
ac.mu.Unlock()
select {
case <-ctx.Done():
return nil, transport.ContextErr(ctx.Err())
return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
ac.mu.Lock()
if ac.state == TransientFailure && failFast {
ac.mu.Unlock()
return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
}
ac.mu.Unlock()
}
}
}

View File

@ -389,6 +389,19 @@ func toRPCErr(err error) error {
code: codes.Internal,
desc: e.Desc,
}
default:
switch err {
case context.DeadlineExceeded:
return rpcError{
code: codes.DeadlineExceeded,
desc: err.Error(),
}
case context.Canceled:
return rpcError{
code: codes.Canceled,
desc: err.Error(),
}
}
}
return Errorf(codes.Unknown, "%v", err)
}

View File

@ -102,6 +102,7 @@ type ClientStream interface {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var (
t transport.ClientTransport
s *transport.Stream
err error
put func()
)
@ -111,13 +112,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
return nil, toRPCErr(err)
}
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
return nil, toRPCErr(err)
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
@ -130,7 +124,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
opts: opts,
c: c,
desc: desc,
put: put,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
@ -149,11 +142,47 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
ctx = trace.NewContext(ctx, cs.trInfo.tr)
}
s, err := t.NewStream(ctx, callHdr)
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
for {
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := err.(rpcError); ok {
return nil, err
}
if err == ErrClientConnClosing {
return nil, Errorf(codes.FailedPrecondition, "%v", err)
}
if err == errConnClosing {
if c.failFast {
return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
}
continue
}
// All the other errors are treated as Internal errors.
return nil, Errorf(codes.Internal, "%v", err)
}
s, err = t.NewStream(ctx, callHdr)
if err != nil {
if put != nil {
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok {
if c.failFast {
cs.finish(err)
return nil, toRPCErr(err)
}
continue
}
return nil, toRPCErr(err)
}
break
}
cs.put = put
cs.t = t
cs.s = s
cs.p = &parser{r: s}

View File

@ -1148,9 +1148,7 @@ func testNoService(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
// Make sure setting ack has been sent.
time.Sleep(20 * time.Millisecond)
stream, err := tc.FullDuplexCall(te.ctx)
stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}