stream: fix panic caused by failing to get a transport for a retry attempt (#2958)

This commit is contained in:
Can Guler 2019-08-06 15:36:33 -07:00 committed by GitHub
parent a2bdfb40ff
commit 1f154c6e18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 13 deletions

View File

@ -327,8 +327,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
return cs, nil return cs, nil
} }
// newAttemptLocked creates a new attempt with a transport.
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error { func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error {
cs.attempt = &csAttempt{ newAttempt := &csAttempt{
cs: cs, cs: cs,
dc: cs.cc.dopts.dc, dc: cs.cc.dopts.dc,
statsHandler: sh, statsHandler: sh,
@ -345,8 +347,9 @@ func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) er
if trInfo != nil { if trInfo != nil {
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
} }
cs.attempt.t = t newAttempt.t = t
cs.attempt.done = done newAttempt.done = done
cs.attempt = newAttempt
return nil return nil
} }
@ -399,7 +402,14 @@ type clientStream struct {
numRetries int // exclusive of transparent retry attempt(s) numRetries int // exclusive of transparent retry attempt(s)
numRetriesSincePushback int // retries since pushback; to reset backoff numRetriesSincePushback int // retries since pushback; to reset backoff
finished bool // TODO: replace with atomic cmpxchg or sync.Once? finished bool // TODO: replace with atomic cmpxchg or sync.Once?
attempt *csAttempt // the active client stream attempt // attempt is the active client stream attempt.
// The only place where it is written is the newAttemptLocked method and this method never writes nil.
// So, attempt can be nil only inside newClientStream function when clientStream is first created.
// One of the first things done after clientStream's creation, is to call newAttemptLocked which either
// assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
// then newClientStream calls finish on the clientStream and returns. So, finish method is the only
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously. // TODO(hedging): hedging will have multiple attempts simultaneously.
committed bool // active attempt committed for retry? committed bool // active attempt committed for retry?
buffer []func(a *csAttempt) error // operations to replay on retry buffer []func(a *csAttempt) error // operations to replay on retry
@ -805,13 +815,13 @@ func (cs *clientStream) finish(err error) {
} }
if cs.attempt != nil { if cs.attempt != nil {
cs.attempt.finish(err) cs.attempt.finish(err)
}
// after functions all rely upon having a stream. // after functions all rely upon having a stream.
if cs.attempt.s != nil { if cs.attempt.s != nil {
for _, o := range cs.opts { for _, o := range cs.opts {
o.after(cs.callInfo) o.after(cs.callInfo)
} }
} }
}
cs.cancel() cs.cancel()
} }