mirror of https://github.com/grpc/grpc-go.git
Do not create new addrConn when connection error happens (#1369)
This commit is contained in:
parent
2b21bfb96b
commit
98eab9baf6
|
@ -559,15 +559,16 @@ func (cc *ClientConn) scWatcher() {
|
||||||
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
|
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
|
||||||
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
|
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
|
||||||
// If tearDownErr is nil, errConnDrain will be used instead.
|
// If tearDownErr is nil, errConnDrain will be used instead.
|
||||||
|
//
|
||||||
|
// We should never need to replace an addrConn with a new one. This function is only used
|
||||||
|
// as newAddrConn to create new addrConn.
|
||||||
|
// TODO rename this function and clean up the code.
|
||||||
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
|
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
|
||||||
ac := &addrConn{
|
ac := &addrConn{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
dopts: cc.dopts,
|
dopts: cc.dopts,
|
||||||
}
|
}
|
||||||
cc.mu.RLock()
|
|
||||||
ac.dopts.copts.KeepaliveParams = cc.mkp
|
|
||||||
cc.mu.RUnlock()
|
|
||||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||||
ac.stateCV = sync.NewCond(&ac.mu)
|
ac.stateCV = sync.NewCond(&ac.mu)
|
||||||
if EnableTracing {
|
if EnableTracing {
|
||||||
|
@ -598,10 +599,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
|
||||||
cc.mu.Unlock()
|
cc.mu.Unlock()
|
||||||
if stale != nil {
|
if stale != nil {
|
||||||
// There is an addrConn alive on ac.addr already. This could be due to
|
// There is an addrConn alive on ac.addr already. This could be due to
|
||||||
// 1) a buggy Balancer notifies duplicated Addresses;
|
// a buggy Balancer that reports duplicated Addresses.
|
||||||
// 2) goaway was received, a new ac will replace the old ac.
|
|
||||||
// The old ac should be deleted from cc.conns, but the
|
|
||||||
// underlying transport should drain rather than close.
|
|
||||||
if tearDownErr == nil {
|
if tearDownErr == nil {
|
||||||
// tearDownErr is nil if resetAddrConn is called by
|
// tearDownErr is nil if resetAddrConn is called by
|
||||||
// 1) Dial
|
// 1) Dial
|
||||||
|
@ -828,26 +826,44 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
|
||||||
return ac.state, nil
|
return ac.state, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *addrConn) resetTransport(closeTransport bool) error {
|
// resetTransport recreates a transport to the address for ac.
|
||||||
|
// For the old transport:
|
||||||
|
// - if drain is true, it will be gracefully closed.
|
||||||
|
// - otherwise, it will be closed.
|
||||||
|
func (ac *addrConn) resetTransport(drain bool) error {
|
||||||
|
ac.mu.Lock()
|
||||||
|
if ac.state == Shutdown {
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return errConnClosing
|
||||||
|
}
|
||||||
|
ac.printf("connecting")
|
||||||
|
if ac.down != nil {
|
||||||
|
ac.down(downErrorf(false, true, "%v", errNetworkIO))
|
||||||
|
ac.down = nil
|
||||||
|
}
|
||||||
|
ac.state = Connecting
|
||||||
|
ac.stateCV.Broadcast()
|
||||||
|
t := ac.transport
|
||||||
|
ac.transport = nil
|
||||||
|
ac.mu.Unlock()
|
||||||
|
if t != nil {
|
||||||
|
if drain {
|
||||||
|
t.GracefulClose()
|
||||||
|
} else {
|
||||||
|
t.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ac.cc.mu.RLock()
|
||||||
|
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||||
|
ac.cc.mu.RUnlock()
|
||||||
for retries := 0; ; retries++ {
|
for retries := 0; ; retries++ {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
ac.printf("connecting")
|
|
||||||
if ac.state == Shutdown {
|
if ac.state == Shutdown {
|
||||||
// ac.tearDown(...) has been invoked.
|
// ac.tearDown(...) has been invoked.
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return errConnClosing
|
return errConnClosing
|
||||||
}
|
}
|
||||||
if ac.down != nil {
|
|
||||||
ac.down(downErrorf(false, true, "%v", errNetworkIO))
|
|
||||||
ac.down = nil
|
|
||||||
}
|
|
||||||
ac.state = Connecting
|
|
||||||
ac.stateCV.Broadcast()
|
|
||||||
t := ac.transport
|
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
if closeTransport && t != nil {
|
|
||||||
t.Close()
|
|
||||||
}
|
|
||||||
sleepTime := ac.dopts.bs.backoff(retries)
|
sleepTime := ac.dopts.bs.backoff(retries)
|
||||||
timeout := minConnectTimeout
|
timeout := minConnectTimeout
|
||||||
if timeout < sleepTime {
|
if timeout < sleepTime {
|
||||||
|
@ -883,7 +899,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
|
||||||
ac.ready = nil
|
ac.ready = nil
|
||||||
}
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
closeTransport = false
|
|
||||||
timer := time.NewTimer(sleepTime - time.Since(connectTime))
|
timer := time.NewTimer(sleepTime - time.Since(connectTime))
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
|
@ -936,19 +951,25 @@ func (ac *addrConn) transportMonitor() {
|
||||||
return
|
return
|
||||||
case <-t.GoAway():
|
case <-t.GoAway():
|
||||||
ac.adjustParams(t.GetGoAwayReason())
|
ac.adjustParams(t.GetGoAwayReason())
|
||||||
// If GoAway happens without any network I/O error, ac is closed without shutting down the
|
// If GoAway happens without any network I/O error, the underlying transport
|
||||||
// underlying transport (the transport will be closed when all the pending RPCs finished or
|
// will be gracefully closed, and a new transport will be created.
|
||||||
// failed.).
|
// (The transport will be closed when all the pending RPCs finished or failed.)
|
||||||
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
|
// If GoAway and some network I/O error happen concurrently, the underlying transport
|
||||||
// are closed.
|
// will be closed, and a new transport will be created.
|
||||||
// In both cases, a new ac is created.
|
var drain bool
|
||||||
select {
|
select {
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
|
|
||||||
default:
|
default:
|
||||||
ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
|
drain = true
|
||||||
|
}
|
||||||
|
if err := ac.resetTransport(drain); err != nil {
|
||||||
|
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
|
||||||
|
if err != errConnClosing {
|
||||||
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
|
ac.tearDown(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
select {
|
select {
|
||||||
case <-ac.ctx.Done():
|
case <-ac.ctx.Done():
|
||||||
|
@ -956,8 +977,14 @@ func (ac *addrConn) transportMonitor() {
|
||||||
return
|
return
|
||||||
case <-t.GoAway():
|
case <-t.GoAway():
|
||||||
ac.adjustParams(t.GetGoAwayReason())
|
ac.adjustParams(t.GetGoAwayReason())
|
||||||
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
|
if err := ac.resetTransport(false); err != nil {
|
||||||
return
|
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
|
||||||
|
if err != errConnClosing {
|
||||||
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
|
ac.tearDown(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
|
@ -969,7 +996,8 @@ func (ac *addrConn) transportMonitor() {
|
||||||
ac.state = TransientFailure
|
ac.state = TransientFailure
|
||||||
ac.stateCV.Broadcast()
|
ac.stateCV.Broadcast()
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
if err := ac.resetTransport(true); err != nil {
|
if err := ac.resetTransport(false); err != nil {
|
||||||
|
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
ac.printf("transport exiting: %v", err)
|
ac.printf("transport exiting: %v", err)
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue