remove transportSeq

This commit is contained in:
iamqizhao 2015-09-21 18:17:49 -07:00
parent 66a18cfe4f
commit dd992b3748
3 changed files with 9 additions and 12 deletions

View File

@ -142,7 +142,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Delay: false, Delay: false,
} }
var ( var (
ts int // track the transport sequence number
lastErr error // record the error that happened lastErr error // record the error that happened
) )
for { for {
@ -155,7 +154,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if lastErr != nil && c.failFast { if lastErr != nil && c.failFast {
return toRPCErr(lastErr) return toRPCErr(lastErr)
} }
t, ts, err = cc.wait(ctx, ts) t, err = cc.wait(ctx)
if err != nil { if err != nil {
if lastErr != nil { if lastErr != nil {
// This was a retry; return the error from the last attempt. // This was a retry; return the error from the last attempt.

View File

@ -403,19 +403,17 @@ func (cc *ClientConn) transportMonitor() {
} }
// When wait returns, either the new transport is up or ClientConn is // When wait returns, either the new transport is up or ClientConn is
// closing. Used to avoid working on a dying transport. It updates and // closing.
// returns the transport and its version when there is no error. func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, error) {
func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTransport, int, error) {
for { for {
cc.mu.Lock() cc.mu.Lock()
switch { switch {
case cc.state == Shutdown: case cc.state == Shutdown:
cc.mu.Unlock() cc.mu.Unlock()
return nil, 0, ErrClientConnClosing return nil, ErrClientConnClosing
case ts < cc.transportSeq: case cc.state == Ready:
// Worked on a dying transport. Try the new one immediately. cc.mu.Unlock()
defer cc.mu.Unlock() return cc.transport, nil
return cc.transport, cc.transportSeq, nil
default: default:
ready := cc.ready ready := cc.ready
if ready == nil { if ready == nil {
@ -425,7 +423,7 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
cc.mu.Unlock() cc.mu.Unlock()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, 0, transport.ContextErr(ctx.Err()) return nil, transport.ContextErr(ctx.Err())
// Wait until the new transport is ready or failed. // Wait until the new transport is ready or failed.
case <-ready: case <-ready:
} }

View File

@ -114,7 +114,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
} }
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
} }
t, _, err := cc.wait(ctx, 0) t, err := cc.wait(ctx)
if err != nil { if err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }