mirror of https://github.com/grpc/grpc-go.git
client: support a 1:1 mapping with acbws and addrConns (#6302)
This commit is contained in:
parent
2a266e78a0
commit
e9799e79db
|
|
@ -133,19 +133,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
|
||||||
// updateSubConnState is invoked by grpc to push a subConn state update to the
|
// updateSubConnState is invoked by grpc to push a subConn state update to the
|
||||||
// underlying balancer.
|
// underlying balancer.
|
||||||
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
|
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
|
||||||
// When updating addresses for a SubConn, if the address in use is not in
|
|
||||||
// the new addresses, the old ac will be tearDown() and a new ac will be
|
|
||||||
// created. tearDown() generates a state change with Shutdown state, we
|
|
||||||
// don't want the balancer to receive this state change. So before
|
|
||||||
// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
|
|
||||||
// this function will be called with (nil, Shutdown). We don't need to call
|
|
||||||
// balancer method in this case.
|
|
||||||
//
|
|
||||||
// TODO: Suppress the above mentioned state change to Shutdown, so we don't
|
|
||||||
// have to handle it here.
|
|
||||||
if sc == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ccb.mu.Lock()
|
ccb.mu.Lock()
|
||||||
ccb.serializer.Schedule(func(_ context.Context) {
|
ccb.serializer.Schedule(func(_ context.Context) {
|
||||||
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
|
ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
|
||||||
|
|
@ -315,7 +302,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
|
||||||
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
|
return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(addrs) <= 0 {
|
if len(addrs) == 0 {
|
||||||
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
|
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
|
||||||
}
|
}
|
||||||
ac, err := ccb.cc.newAddrConn(addrs, opts)
|
ac, err := ccb.cc.newAddrConn(addrs, opts)
|
||||||
|
|
@ -324,9 +311,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
|
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
|
||||||
acbw.ac.mu.Lock()
|
|
||||||
ac.acbw = acbw
|
ac.acbw = acbw
|
||||||
acbw.ac.mu.Unlock()
|
|
||||||
return acbw, nil
|
return acbw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -347,7 +332,7 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
|
ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
||||||
|
|
@ -391,63 +376,24 @@ func (ccb *ccBalancerWrapper) Target() string {
|
||||||
// acBalancerWrapper is a wrapper on top of ac for balancers.
|
// acBalancerWrapper is a wrapper on top of ac for balancers.
|
||||||
// It implements balancer.SubConn interface.
|
// It implements balancer.SubConn interface.
|
||||||
type acBalancerWrapper struct {
|
type acBalancerWrapper struct {
|
||||||
|
ac *addrConn // read-only
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
ac *addrConn
|
|
||||||
producers map[balancer.ProducerBuilder]*refCountedProducer
|
producers map[balancer.ProducerBuilder]*refCountedProducer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (acbw *acBalancerWrapper) String() string {
|
||||||
|
return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
|
||||||
|
}
|
||||||
|
|
||||||
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
|
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
|
||||||
acbw.mu.Lock()
|
acbw.ac.updateAddrs(addrs)
|
||||||
defer acbw.mu.Unlock()
|
|
||||||
if len(addrs) <= 0 {
|
|
||||||
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !acbw.ac.tryUpdateAddrs(addrs) {
|
|
||||||
cc := acbw.ac.cc
|
|
||||||
opts := acbw.ac.scopts
|
|
||||||
acbw.ac.mu.Lock()
|
|
||||||
// Set old ac.acbw to nil so the Shutdown state update will be ignored
|
|
||||||
// by balancer.
|
|
||||||
//
|
|
||||||
// TODO(bar) the state transition could be wrong when tearDown() old ac
|
|
||||||
// and creating new ac, fix the transition.
|
|
||||||
acbw.ac.acbw = nil
|
|
||||||
acbw.ac.mu.Unlock()
|
|
||||||
acState := acbw.ac.getState()
|
|
||||||
acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
|
|
||||||
|
|
||||||
if acState == connectivity.Shutdown {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
newAC, err := cc.newAddrConn(addrs, opts)
|
|
||||||
if err != nil {
|
|
||||||
channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
acbw.ac = newAC
|
|
||||||
newAC.mu.Lock()
|
|
||||||
newAC.acbw = acbw
|
|
||||||
newAC.mu.Unlock()
|
|
||||||
if acState != connectivity.Idle {
|
|
||||||
go newAC.connect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acbw *acBalancerWrapper) Connect() {
|
func (acbw *acBalancerWrapper) Connect() {
|
||||||
acbw.mu.Lock()
|
|
||||||
defer acbw.mu.Unlock()
|
|
||||||
go acbw.ac.connect()
|
go acbw.ac.connect()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
|
|
||||||
acbw.mu.Lock()
|
|
||||||
defer acbw.mu.Unlock()
|
|
||||||
return acbw.ac
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
|
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
|
||||||
// ready, blocks until it is or ctx expires. Returns an error when the context
|
// ready, blocks until it is or ctx expires. Returns an error when the context
|
||||||
// expires or the addrConn is shut down.
|
// expires or the addrConn is shut down.
|
||||||
|
|
|
||||||
129
clientconn.go
129
clientconn.go
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
@ -970,9 +969,6 @@ func (ac *addrConn) connect() error {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Update connectivity state within the lock to prevent subsequent or
|
|
||||||
// concurrent calls from resetting the transport more than once.
|
|
||||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
||||||
ac.resetTransport()
|
ac.resetTransport()
|
||||||
|
|
@ -991,58 +987,60 @@ func equalAddresses(a, b []resolver.Address) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
|
// updateAddrs updates ac.addrs with the new addresses list and handles active
|
||||||
//
|
// connections or connection attempts.
|
||||||
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
|
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
|
||||||
// addresses will be picked up by retry in the next iteration after backoff.
|
|
||||||
//
|
|
||||||
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
|
|
||||||
//
|
|
||||||
// If the addresses is the same as the old list, it does nothing and returns
|
|
||||||
// true.
|
|
||||||
//
|
|
||||||
// If ac is Connecting, it returns false. The caller should tear down the ac and
|
|
||||||
// create a new one. Note that the backoff will be reset when this happens.
|
|
||||||
//
|
|
||||||
// If ac is Ready, it checks whether current connected address of ac is in the
|
|
||||||
// new addrs list.
|
|
||||||
// - If true, it updates ac.addrs and returns true. The ac will keep using
|
|
||||||
// the existing connection.
|
|
||||||
// - If false, it does nothing and returns false.
|
|
||||||
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
defer ac.mu.Unlock()
|
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
|
||||||
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
|
|
||||||
|
if equalAddresses(ac.addrs, addrs) {
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ac.addrs = addrs
|
||||||
|
|
||||||
if ac.state == connectivity.Shutdown ||
|
if ac.state == connectivity.Shutdown ||
|
||||||
ac.state == connectivity.TransientFailure ||
|
ac.state == connectivity.TransientFailure ||
|
||||||
ac.state == connectivity.Idle {
|
ac.state == connectivity.Idle {
|
||||||
ac.addrs = addrs
|
// We were not connecting, so do nothing but update the addresses.
|
||||||
return true
|
ac.mu.Unlock()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if equalAddresses(ac.addrs, addrs) {
|
if ac.state == connectivity.Ready {
|
||||||
return true
|
// Try to find the connected address.
|
||||||
}
|
|
||||||
|
|
||||||
if ac.state == connectivity.Connecting {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// ac.state is Ready, try to find the connected address.
|
|
||||||
var curAddrFound bool
|
|
||||||
for _, a := range addrs {
|
for _, a := range addrs {
|
||||||
a.ServerName = ac.cc.getServerName(a)
|
a.ServerName = ac.cc.getServerName(a)
|
||||||
if reflect.DeepEqual(ac.curAddr, a) {
|
if a.Equal(ac.curAddr) {
|
||||||
curAddrFound = true
|
// We are connected to a valid address, so do nothing but
|
||||||
break
|
// update the addresses.
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
|
|
||||||
if curAddrFound {
|
|
||||||
ac.addrs = addrs
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return curAddrFound
|
// We are either connected to the wrong address or currently connecting.
|
||||||
|
// Stop the current iteration and restart.
|
||||||
|
|
||||||
|
ac.cancel()
|
||||||
|
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
|
||||||
|
|
||||||
|
// We have to defer here because GracefulClose => Close => onClose, which
|
||||||
|
// requires locking ac.mu.
|
||||||
|
defer ac.transport.GracefulClose()
|
||||||
|
ac.transport = nil
|
||||||
|
|
||||||
|
if len(addrs) == 0 {
|
||||||
|
ac.updateConnectivityState(connectivity.Idle, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
ac.mu.Unlock()
|
||||||
|
|
||||||
|
// Since we were connecting/connected, we should start a new connection
|
||||||
|
// attempt.
|
||||||
|
go ac.resetTransport()
|
||||||
}
|
}
|
||||||
|
|
||||||
// getServerName determines the serverName to be used in the connection
|
// getServerName determines the serverName to be used in the connection
|
||||||
|
|
@ -1301,7 +1299,8 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||||
|
|
||||||
func (ac *addrConn) resetTransport() {
|
func (ac *addrConn) resetTransport() {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state == connectivity.Shutdown {
|
acCtx := ac.ctx
|
||||||
|
if acCtx.Err() != nil {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -1329,15 +1328,14 @@ func (ac *addrConn) resetTransport() {
|
||||||
ac.updateConnectivityState(connectivity.Connecting, nil)
|
ac.updateConnectivityState(connectivity.Connecting, nil)
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
||||||
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
|
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
|
||||||
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
ac.cc.resolveNow(resolver.ResolveNowOptions{})
|
||||||
// After exhausting all addresses, the addrConn enters
|
// After exhausting all addresses, the addrConn enters
|
||||||
// TRANSIENT_FAILURE.
|
// TRANSIENT_FAILURE.
|
||||||
ac.mu.Lock()
|
if acCtx.Err() != nil {
|
||||||
if ac.state == connectivity.Shutdown {
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
ac.mu.Lock()
|
||||||
ac.updateConnectivityState(connectivity.TransientFailure, err)
|
ac.updateConnectivityState(connectivity.TransientFailure, err)
|
||||||
|
|
||||||
// Backoff.
|
// Backoff.
|
||||||
|
|
@ -1352,13 +1350,13 @@ func (ac *addrConn) resetTransport() {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
case <-b:
|
case <-b:
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
case <-ac.ctx.Done():
|
case <-acCtx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state != connectivity.Shutdown {
|
if acCtx.Err() == nil {
|
||||||
ac.updateConnectivityState(connectivity.Idle, err)
|
ac.updateConnectivityState(connectivity.Idle, err)
|
||||||
}
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
@ -1373,14 +1371,13 @@ func (ac *addrConn) resetTransport() {
|
||||||
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
|
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
|
||||||
// the first successful one. It returns an error if no address was successfully
|
// the first successful one. It returns an error if no address was successfully
|
||||||
// connected, or updates ac appropriately with the new transport.
|
// connected, or updates ac appropriately with the new transport.
|
||||||
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
|
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
|
||||||
var firstConnErr error
|
var firstConnErr error
|
||||||
for _, addr := range addrs {
|
for _, addr := range addrs {
|
||||||
ac.mu.Lock()
|
if ctx.Err() != nil {
|
||||||
if ac.state == connectivity.Shutdown {
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return errConnClosing
|
return errConnClosing
|
||||||
}
|
}
|
||||||
|
ac.mu.Lock()
|
||||||
|
|
||||||
ac.cc.mu.RLock()
|
ac.cc.mu.RLock()
|
||||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||||
|
|
@ -1394,7 +1391,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
||||||
|
|
||||||
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
|
channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
|
||||||
|
|
||||||
err := ac.createTransport(addr, copts, connectDeadline)
|
err := ac.createTransport(ctx, addr, copts, connectDeadline)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -1411,19 +1408,20 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
||||||
// createTransport creates a connection to addr. It returns an error if the
|
// createTransport creates a connection to addr. It returns an error if the
|
||||||
// address was not successfully connected, or updates ac appropriately with the
|
// address was not successfully connected, or updates ac appropriately with the
|
||||||
// new transport.
|
// new transport.
|
||||||
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
||||||
addr.ServerName = ac.cc.getServerName(addr)
|
addr.ServerName = ac.cc.getServerName(addr)
|
||||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
hctx, hcancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
onClose := func(r transport.GoAwayReason) {
|
onClose := func(r transport.GoAwayReason) {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
defer ac.mu.Unlock()
|
defer ac.mu.Unlock()
|
||||||
// adjust params based on GoAwayReason
|
// adjust params based on GoAwayReason
|
||||||
ac.adjustParams(r)
|
ac.adjustParams(r)
|
||||||
if ac.state == connectivity.Shutdown {
|
if ctx.Err() != nil {
|
||||||
// Already shut down. tearDown() already cleared the transport and
|
// Already shut down or connection attempt canceled. tearDown() or
|
||||||
// canceled hctx via ac.ctx, and we expected this connection to be
|
// updateAddrs() already cleared the transport and canceled hctx
|
||||||
// closed, so do nothing here.
|
// via ac.ctx, and we expected this connection to be closed, so do
|
||||||
|
// nothing here.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
hcancel()
|
hcancel()
|
||||||
|
|
@ -1442,7 +1440,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||||
ac.updateConnectivityState(connectivity.Idle, nil)
|
ac.updateConnectivityState(connectivity.Idle, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
copts.ChannelzParentID = ac.channelzID
|
copts.ChannelzParentID = ac.channelzID
|
||||||
|
|
||||||
|
|
@ -1459,7 +1457,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||||
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
defer ac.mu.Unlock()
|
defer ac.mu.Unlock()
|
||||||
if ac.state == connectivity.Shutdown {
|
if ctx.Err() != nil {
|
||||||
// This can happen if the subConn was removed while in `Connecting`
|
// This can happen if the subConn was removed while in `Connecting`
|
||||||
// state. tearDown() would have set the state to `Shutdown`, but
|
// state. tearDown() would have set the state to `Shutdown`, but
|
||||||
// would not have closed the transport since ac.transport would not
|
// would not have closed the transport since ac.transport would not
|
||||||
|
|
@ -1471,6 +1469,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||||
// The error we pass to Close() is immaterial since there are no open
|
// The error we pass to Close() is immaterial since there are no open
|
||||||
// streams at this point, so no trailers with error details will be sent
|
// streams at this point, so no trailers with error details will be sent
|
||||||
// out. We just need to pass a non-nil error.
|
// out. We just need to pass a non-nil error.
|
||||||
|
//
|
||||||
|
// This can also happen when updateAddrs is called during a connection
|
||||||
|
// attempt.
|
||||||
go newTr.Close(transport.ErrConnClosing)
|
go newTr.Close(transport.ErrConnClosing)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -68,10 +68,8 @@ func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
|
||||||
// - wraps the done function in the passed in result to increment the calls
|
// - wraps the done function in the passed in result to increment the calls
|
||||||
// failed or calls succeeded channelz counter before invoking the actual
|
// failed or calls succeeded channelz counter before invoking the actual
|
||||||
// done function.
|
// done function.
|
||||||
func doneChannelzWrapper(acw *acBalancerWrapper, result *balancer.PickResult) {
|
func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
|
||||||
acw.mu.Lock()
|
ac := acbw.ac
|
||||||
ac := acw.ac
|
|
||||||
acw.mu.Unlock()
|
|
||||||
ac.incrCallsStarted()
|
ac.incrCallsStarted()
|
||||||
done := result.Done
|
done := result.Done
|
||||||
result.Done = func(b balancer.DoneInfo) {
|
result.Done = func(b balancer.DoneInfo) {
|
||||||
|
|
@ -157,14 +155,14 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
|
||||||
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
|
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
|
acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
|
logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if t := acw.getAddrConn().getReadyTransport(); t != nil {
|
if t := acbw.ac.getReadyTransport(); t != nil {
|
||||||
if channelz.IsOn() {
|
if channelz.IsOn() {
|
||||||
doneChannelzWrapper(acw, &pickResult)
|
doneChannelzWrapper(acbw, &pickResult)
|
||||||
return t, pickResult, nil
|
return t, pickResult, nil
|
||||||
}
|
}
|
||||||
return t, pickResult, nil
|
return t, pickResult, nil
|
||||||
|
|
|
||||||
17
stream.go
17
stream.go
|
|
@ -1273,14 +1273,19 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
|
||||||
as.p = &parser{r: s}
|
as.p = &parser{r: s}
|
||||||
ac.incrCallsStarted()
|
ac.incrCallsStarted()
|
||||||
if desc != unaryStreamDesc {
|
if desc != unaryStreamDesc {
|
||||||
// Listen on cc and stream contexts to cleanup when the user closes the
|
// Listen on stream context to cleanup when the stream context is
|
||||||
// ClientConn or cancels the stream context. In all other cases, an error
|
// canceled. Also listen for the addrConn's context in case the
|
||||||
// should already be injected into the recv buffer by the transport, which
|
// addrConn is closed or reconnects to a different address. In all
|
||||||
// the client will eventually receive, and then we will cancel the stream's
|
// other cases, an error should already be injected into the recv
|
||||||
// context in clientStream.finish.
|
// buffer by the transport, which the client will eventually receive,
|
||||||
|
// and then we will cancel the stream's context in
|
||||||
|
// addrConnStream.finish.
|
||||||
go func() {
|
go func() {
|
||||||
|
ac.mu.Lock()
|
||||||
|
acCtx := ac.ctx
|
||||||
|
ac.mu.Unlock()
|
||||||
select {
|
select {
|
||||||
case <-ac.ctx.Done():
|
case <-acCtx.Done():
|
||||||
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
|
as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
as.finish(toRPCErr(ctx.Err()))
|
as.finish(toRPCErr(ctx.Err()))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue