From b1f7648a9fc72ce76cbcd42d8e2c60d9d9bed9fc Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Fri, 21 May 2021 15:15:58 -0700 Subject: [PATCH] client: ensure LB policy is closed before closing resolver (#4478) --- balancer_conn_wrappers.go | 19 ++++++++++++++----- clientconn.go | 6 +++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 4cc7f9159..f1bb6dd30 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -44,6 +44,7 @@ type ccBalancerWrapper struct { balancerMu sync.Mutex // synchronizes calls to the balancer balancer balancer.Balancer scBuffer *buffer.Unbounded + closed *grpcsync.Event done *grpcsync.Event mu sync.Mutex @@ -54,6 +55,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui ccb := &ccBalancerWrapper{ cc: cc, scBuffer: buffer.NewUnbounded(), + closed: grpcsync.NewEvent(), done: grpcsync.NewEvent(), subConns: make(map[*acBalancerWrapper]struct{}), } @@ -69,33 +71,40 @@ func (ccb *ccBalancerWrapper) watcher() { select { case t := <-ccb.scBuffer.Get(): ccb.scBuffer.Load() - if ccb.done.HasFired() { + if ccb.closed.HasFired() { break } ccb.balancerMu.Lock() su := t.(*scStateUpdate) ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err}) ccb.balancerMu.Unlock() - case <-ccb.done.Done(): + case <-ccb.closed.Done(): } - if ccb.done.HasFired() { + if ccb.closed.HasFired() { + ccb.balancerMu.Lock() ccb.balancer.Close() + ccb.balancerMu.Unlock() ccb.mu.Lock() scs := ccb.subConns ccb.subConns = nil ccb.mu.Unlock() + ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) + ccb.done.Fire() + // Fire done before removing the addr conns. We can safely unblock + // ccb.close and allow the removeAddrConns to happen + // asynchronously. for acbw := range scs { ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } - ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) return } } } func (ccb *ccBalancerWrapper) close() { - ccb.done.Fire() + ccb.closed.Fire() + <-ccb.done.Done() } func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { diff --git a/clientconn.go b/clientconn.go index 24109264f..0236c81c4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1046,12 +1046,12 @@ func (cc *ClientConn) Close() error { cc.blockingpicker.close() - if rWrapper != nil { - rWrapper.close() - } if bWrapper != nil { bWrapper.close() } + if rWrapper != nil { + rWrapper.close() + } for ac := range conns { ac.tearDown(ErrClientConnClosing)