xds/clustermanager: pause picker updates during UpdateClientConnState (#5528)

This commit is contained in:
Doug Fawley 2022-07-21 22:31:07 +00:00 committed by GitHub
parent 86117db53e
commit fdc5d2f3da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 0 deletions

View File

@ -57,6 +57,11 @@ type balancerStateAggregator struct {
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[string]*subBalancerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
// Set when UpdateState call propagation is paused and an UpdateState call
// is suppressed.
needUpdateStateOnResume bool
}
func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
@ -118,6 +123,27 @@ func (bsa *balancerStateAggregator) remove(id string) {
delete(bsa.idToPickerState, id)
}
// pauseStateUpdates causes UpdateState calls to not propagate to the parent
// ClientConn. The last state will be remembered and propagated when
// ResumeStateUpdates is called.
func (bsa *balancerStateAggregator) pauseStateUpdates() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.pauseUpdateState = true
bsa.needUpdateStateOnResume = false
}
// resumeStateUpdates will resume propagating UpdateState calls to the parent,
// and call UpdateState on the parent if any UpdateState call was suppressed.
func (bsa *balancerStateAggregator) resumeStateUpdates() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.pauseUpdateState = false
if bsa.needUpdateStateOnResume {
bsa.cc.UpdateState(bsa.build())
}
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
@ -143,6 +169,12 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}
@ -168,6 +200,12 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

View File

@ -123,6 +123,8 @@ func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error {
}
b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState)
b.stateAggregator.pauseStateUpdates()
defer b.stateAggregator.resumeStateUpdates()
b.updateChildren(s, newConfig)
return nil
}

View File

@ -763,3 +763,59 @@ func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
}
wb.ClientConn.UpdateState(state)
}
// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
states []balancer.State
}
func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
}
func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}
balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
return nil
},
}
stub.Register("update_state_balancer", balFuncs)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
configJSON1 := `{
"children": {
"cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] }
}
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that the only state update is the second one called by the child.
if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
}
}