mirror of https://github.com/grpc/grpc-go.git
client: handle RemoveSubConn in goroutine to avoid deadlock (#4504)
This commit is contained in:
parent
174b1c28af
commit
0956b12520
|
|
@ -43,7 +43,7 @@ type ccBalancerWrapper struct {
|
|||
cc *ClientConn
|
||||
balancerMu sync.Mutex // synchronizes calls to the balancer
|
||||
balancer balancer.Balancer
|
||||
scBuffer *buffer.Unbounded
|
||||
updateCh *buffer.Unbounded
|
||||
closed *grpcsync.Event
|
||||
done *grpcsync.Event
|
||||
|
||||
|
|
@ -54,7 +54,7 @@ type ccBalancerWrapper struct {
|
|||
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
|
||||
ccb := &ccBalancerWrapper{
|
||||
cc: cc,
|
||||
scBuffer: buffer.NewUnbounded(),
|
||||
updateCh: buffer.NewUnbounded(),
|
||||
closed: grpcsync.NewEvent(),
|
||||
done: grpcsync.NewEvent(),
|
||||
subConns: make(map[*acBalancerWrapper]struct{}),
|
||||
|
|
@ -69,15 +69,26 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
|
|||
func (ccb *ccBalancerWrapper) watcher() {
|
||||
for {
|
||||
select {
|
||||
case t := <-ccb.scBuffer.Get():
|
||||
ccb.scBuffer.Load()
|
||||
case t := <-ccb.updateCh.Get():
|
||||
ccb.updateCh.Load()
|
||||
if ccb.closed.HasFired() {
|
||||
break
|
||||
}
|
||||
ccb.balancerMu.Lock()
|
||||
su := t.(*scStateUpdate)
|
||||
ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
|
||||
ccb.balancerMu.Unlock()
|
||||
switch u := t.(type) {
|
||||
case *scStateUpdate:
|
||||
ccb.balancerMu.Lock()
|
||||
ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})
|
||||
ccb.balancerMu.Unlock()
|
||||
case *acBalancerWrapper:
|
||||
ccb.mu.Lock()
|
||||
if ccb.subConns != nil {
|
||||
delete(ccb.subConns, u)
|
||||
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
|
||||
}
|
||||
ccb.mu.Unlock()
|
||||
default:
|
||||
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
|
||||
}
|
||||
case <-ccb.closed.Done():
|
||||
}
|
||||
|
||||
|
|
@ -118,7 +129,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
|
|||
if sc == nil {
|
||||
return
|
||||
}
|
||||
ccb.scBuffer.Put(&scStateUpdate{
|
||||
ccb.updateCh.Put(&scStateUpdate{
|
||||
sc: sc,
|
||||
state: s,
|
||||
err: err,
|
||||
|
|
@ -159,17 +170,10 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
|
|||
}
|
||||
|
||||
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
|
||||
acbw, ok := sc.(*acBalancerWrapper)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ccb.mu.Lock()
|
||||
defer ccb.mu.Unlock()
|
||||
if ccb.subConns == nil {
|
||||
return
|
||||
}
|
||||
delete(ccb.subConns, acbw)
|
||||
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
|
||||
// The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock
|
||||
// during switchBalancer() if the old balancer calls RemoveSubConn() in its
|
||||
// Close().
|
||||
ccb.updateCh.Put(sc)
|
||||
}
|
||||
|
||||
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
|
|
@ -531,6 +532,51 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer"
|
||||
|
||||
func init() {
|
||||
stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{
|
||||
Close: func(data *stub.BalancerData) {
|
||||
data.ClientConn.RemoveSubConn(&acBalancerWrapper{})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// Test that when switching to balancers, the old balancer calls RemoveSubConn
|
||||
// in Close.
|
||||
//
|
||||
// This test is to make sure this close doesn't cause a deadlock.
|
||||
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil)
|
||||
// This service config update will switch balancer from
|
||||
// "test-inline-remove-subconn-balancer" to "pick_first". The test balancer
|
||||
// will be closed, which will call cc.RemoveSubConn() inline (this
|
||||
// RemoveSubConn is not required by the API, but some balancers might do
|
||||
// it).
|
||||
//
|
||||
// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
|
||||
// deadlock (e.g. trying to grab a mutex while it's already locked).
|
||||
//
|
||||
// Do it in a goroutine so this test will fail with a helpful message
|
||||
// (though the goroutine will still leak).
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-time.After(defaultTestTimeout):
|
||||
t.Fatalf("timeout waiting for updateResolverState to finish")
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
|
||||
scpr := r.CC.ParseServiceConfig(s)
|
||||
if scpr.Err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue