balancergroup: Make closing terminal (#8095)

This commit is contained in:
Arjan Singh Bal 2025-02-25 11:29:05 +05:30 committed by GitHub
parent e0ac3acff4
commit aa629e0ef3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 60 additions and 117 deletions

View File

@ -148,7 +148,6 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
Logger: lb.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
lb.bg.Start()
go lb.run()
return lb
}

View File

@ -62,7 +62,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
}

View File

@ -194,7 +194,7 @@ type BalancerGroup struct {
// The corresponding boolean outgoingStarted is used to stop further updates
// to sub-balancers after they are closed.
outgoingMu sync.Mutex
outgoingStarted bool
outgoingClosed bool
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed. This is `nil` if caching
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
@ -224,7 +224,7 @@ type BalancerGroup struct {
// The corresponding boolean incomingStarted is used to stop further updates
// from sub-balancers after they are closed.
incomingMu sync.Mutex
incomingStarted bool // This boolean only guards calls back to ClientConn.
incomingClosed bool // This boolean only guards calls back to ClientConn.
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}
@ -265,30 +265,6 @@ func New(opts Options) *BalancerGroup {
}
}
// Start starts the balancer group, including building all the sub-balancers,
// and send the existing addresses to them.
//
// A BalancerGroup can be closed and started later. When a BalancerGroup is
// closed, it can still receive address updates, which will be applied when
// restarted.
func (bg *BalancerGroup) Start() {
bg.incomingMu.Lock()
bg.incomingStarted = true
bg.incomingMu.Unlock()
bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingMu.Unlock()
return
}
for _, config := range bg.idToBalancerConfig {
config.startBalancer()
}
bg.outgoingStarted = true
bg.outgoingMu.Unlock()
}
// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
@ -299,17 +275,18 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
bg.logger.Infof("Adding child policy of type %q for child %q", balancerName, id)
builder := balancer.Get(balancerName)
if builder == nil {
return fmt.Errorf("unregistered balancer name %q", balancerName)
return fmt.Errorf("balancergroup: unregistered balancer name %q", balancerName)
}
// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return fmt.Errorf("balancergroup: already closed")
}
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
// caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
// Skip searching the cache if disabled.
if bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
if bg.logger.V(2) {
bg.logger.Infof("Removing and reusing child policy of type %q for child %q from the balancer cache", balancerName, id)
@ -341,11 +318,7 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
builder: builder,
buildOpts: bg.buildOpts,
}
if bg.outgoingStarted {
// Only start the balancer if bg is started. Otherwise, we only keep the
// static data.
sbc.startBalancer()
}
sbc.startBalancer()
} else {
// When brining back a sub-balancer from cache, re-send the cached
// picker and state.
@ -369,6 +342,10 @@ func (bg *BalancerGroup) Remove(id string) {
bg.logger.Infof("Removing child policy for child %q", id)
bg.outgoingMu.Lock()
if bg.outgoingClosed {
bg.outgoingMu.Unlock()
return
}
sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok {
@ -379,12 +356,6 @@ func (bg *BalancerGroup) Remove(id string) {
// Unconditionally remove the sub-balancer config from the map.
delete(bg.idToBalancerConfig, id)
if !bg.outgoingStarted {
// Nothing needs to be done here, since we wouldn't have created the
// sub-balancer.
bg.outgoingMu.Unlock()
return
}
if bg.deletedBalancerCache != nil {
if bg.logger.V(2) {
@ -424,6 +395,7 @@ func (bg *BalancerGroup) Remove(id string) {
// cleanup after the timeout.
func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
// Remove SubConns. This is only done after the balancer is
// actually closed.
//
@ -437,18 +409,20 @@ func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
delete(bg.scToSubBalancer, sc)
}
}
bg.incomingMu.Unlock()
}
// connect attempts to connect to all subConns belonging to sb.
func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
bg.incomingMu.Lock()
defer bg.incomingMu.Unlock()
if bg.incomingClosed {
return
}
for sc, b := range bg.scToSubBalancer {
if b == sb {
sc.Connect()
}
}
bg.incomingMu.Unlock()
}
// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
@ -457,6 +431,10 @@ func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
// needed.
func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
bg.incomingMu.Lock()
if bg.incomingClosed {
bg.incomingMu.Unlock()
return
}
if _, ok := bg.scToSubBalancer[sc]; !ok {
bg.incomingMu.Unlock()
return
@ -468,10 +446,13 @@ func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.
bg.incomingMu.Unlock()
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
if cb != nil {
cb(state)
}
bg.outgoingMu.Unlock()
}
// UpdateSubConnState handles the state for the subconn. It finds the
@ -485,6 +466,9 @@ func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.
func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return nil
}
if config, ok := bg.idToBalancerConfig[id]; ok {
return config.updateClientConnState(s)
}
@ -494,10 +478,13 @@ func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnS
// ResolverError forwards resolver errors to all sub-balancers.
func (bg *BalancerGroup) ResolverError(err error) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
for _, config := range bg.idToBalancerConfig {
config.resolverError(err)
}
bg.outgoingMu.Unlock()
}
// Following are actions from sub-balancers, forward to ClientConn.
@ -514,9 +501,9 @@ func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver
// error. But since we call balancer.stopBalancer when removing the balancer, this
// shouldn't happen.
bg.incomingMu.Lock()
if !bg.incomingStarted {
if bg.incomingClosed {
bg.incomingMu.Unlock()
return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
return nil, fmt.Errorf("balancergroup: NewSubConn is called after balancer group is closed")
}
var sc balancer.SubConn
oldListener := opts.StateListener
@ -547,19 +534,23 @@ func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
}
// Close closes the balancer. It stops sub-balancers, and removes the subconns.
// The BalancerGroup can be restarted later.
// When a BalancerGroup is closed, it can not receive further address updates.
func (bg *BalancerGroup) Close() {
bg.incomingMu.Lock()
if bg.incomingStarted {
bg.incomingStarted = false
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingClosed = true
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
bg.incomingMu.Unlock()
bg.outgoingMu.Lock()
// Setting `outgoingClosed` ensures that no entries are added to
// `deletedBalancerCache` after this point.
bg.outgoingClosed = true
bg.outgoingMu.Unlock()
// Clear(true) runs clear function to close sub-balancers in cache. It
// must be called out of outgoing mutex.
if bg.deletedBalancerCache != nil {
@ -567,11 +558,9 @@ func (bg *BalancerGroup) Close() {
}
bg.outgoingMu.Lock()
if bg.outgoingStarted {
bg.outgoingStarted = false
for _, config := range bg.idToBalancerConfig {
config.stopBalancer()
}
for id, config := range bg.idToBalancerConfig {
config.stopBalancer()
delete(bg.idToBalancerConfig, id)
}
bg.outgoingMu.Unlock()
}
@ -581,24 +570,30 @@ func (bg *BalancerGroup) Close() {
// not supported.
func (bg *BalancerGroup) ExitIdle() {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
for _, config := range bg.idToBalancerConfig {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}
// ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
// appropriate and possible.
func (bg *BalancerGroup) ExitIdleOne(id string) {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if bg.outgoingClosed {
return
}
if config := bg.idToBalancerConfig[id]; config != nil {
if !config.exitIdle() {
bg.connect(config)
}
}
bg.outgoingMu.Unlock()
}
// ParseConfig parses a child config list and returns a LB config for the

View File

@ -67,15 +67,12 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// Create a new balancer group, add balancer and backends, but not start.
// Create a new balancer group, add balancer and backends.
// - b1, weight 2, backends [0,1]
// - b2, weight 1, backends [2,3]
// Start the balancer group and check behavior.
//
// Close the balancer group, call add/remove/change weight/change address.
// - b2, weight 3, backends [0,3]
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
// Close the balancer group.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewBalancerClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
@ -97,8 +94,6 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[2:4]}})
bg.Start()
m1 := make(map[string]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
@ -124,42 +119,6 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
for i := 0; i < 4; i++ {
(<-cc.ShutdownSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[1:3]}})
// Remove b1.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
gator.UpdateWeight(testBalancerIDs[1], 3)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: append([]resolver.Endpoint(nil), testBackendEndpoints[0], testBackendEndpoints[3])}})
gator.Start()
bg.Start()
m2 := make(map[string]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m2[addrs[0].Addr] = sc
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m2[testBackendAddrs[0].Addr], m2[testBackendAddrs[0].Addr], m2[testBackendAddrs[0].Addr],
m2[testBackendAddrs[3].Addr], m2[testBackendAddrs[3].Addr], m2[testBackendAddrs[3].Addr],
m2[testBackendAddrs[1].Addr], m2[testBackendAddrs[2].Addr],
}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// Test that balancer group start() doesn't deadlock if the balancer calls back
@ -197,8 +156,6 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], builder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
}
// initBalancerGroupForCachingTest creates a balancer group, and initialize it
@ -228,8 +185,6 @@ func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duratio
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[2:4]}})
bg.Start()
m1 := make(map[string]*testutils.TestSubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
@ -517,7 +472,6 @@ func (s) TestBalancerGroupBuildOptions(t *testing.T) {
StateAggregator: nil,
Logger: nil,
})
bg.Start()
// Add the stub balancer build above as a child policy.
balancerBuilder := balancer.Get(balancerName)
@ -545,7 +499,6 @@ func (s) TestBalancerExitIdleOne(t *testing.T) {
StateAggregator: nil,
Logger: nil,
})
bg.Start()
defer bg.Close()
// Add the stub balancer build above as a child policy.
@ -581,7 +534,6 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Endpoints: testBackendEndpoints[0:2]}})
bg.Start()
defer bg.Close()
m1 := make(map[string]balancer.SubConn)

View File

@ -55,7 +55,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
}

View File

@ -71,7 +71,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
Logger: b.logger,
SubBalancerCloseTimeout: DefaultSubBalancerCloseTimeout,
})
b.bg.Start()
go b.run()
b.logger.Infof("Created")
return b