balancergroup: update methods to V2balancer (#3505)

And make balancergroup stop load reporting is load store is not set
This commit is contained in:
Menghan Li 2020-04-09 11:20:37 -07:00 committed by GitHub
parent 709091fe14
commit a9555d046f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 120 deletions

View File

@ -62,7 +62,7 @@ type subBalancerWithConfig struct {
// The static part of sub-balancer. Keeps balancerBuilders and addresses.
// To be used when restarting sub-balancer.
builder balancer.Builder
addrs []resolver.Address
ccState balancer.ClientConnState
// The dynamic part of sub-balancer. Only used when balancer group is
// started. Gets cleared when sub-balancer is closed.
balancer balancer.Balancer
@ -98,13 +98,13 @@ func (sbc *subBalancerWithConfig) startBalancer() {
sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
sbc.balancer = b
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}})
} else {
b.HandleResolvedAddrs(sbc.addrs, nil)
ub.UpdateClientConnState(sbc.ccState)
return
}
b.HandleResolvedAddrs(sbc.ccState.ResolverState.Addresses, nil)
}
func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
func (sbc *subBalancerWithConfig) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b := sbc.balancer
if b == nil {
// This sub-balancer was closed. This can happen when EDS removes a
@ -114,14 +114,14 @@ func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn,
return
}
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state})
} else {
b.HandleSubConnStateChange(sc, state)
ub.UpdateSubConnState(sc, state)
return
}
b.HandleSubConnStateChange(sc, state.ConnectivityState)
}
func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) {
sbc.addrs = addrs
func (sbc *subBalancerWithConfig) updateClientConnState(s balancer.ClientConnState) error {
sbc.ccState = s
b := sbc.balancer
if b == nil {
// This sub-balancer was closed. This should never happen because
@ -132,13 +132,13 @@ func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) {
// This will be a common case with priority support, because a
// sub-balancer (and the whole balancer group) could be closed because
// it's the lower priority, but it can still get address updates.
return
return nil
}
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}})
} else {
b.HandleResolvedAddrs(addrs, nil)
return ub.UpdateClientConnState(s)
}
b.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
return nil
}
func (sbc *subBalancerWithConfig) stopBalancer() {
@ -213,7 +213,7 @@ type BalancerGroup struct {
// incomingMu guards all operations in the direction:
// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn, and
// updatePicker. It also guards the map from SubConn to balancer ID, so
// handleSubConnStateChange needs to hold it shortly to find the
// updateSubConnState needs to hold it shortly to find the
// sub-balancer to forward the update.
//
// The corresponding boolean incomingStarted is used to stop further updates
@ -435,37 +435,35 @@ func (bg *BalancerGroup) ChangeWeight(id internal.Locality, newWeight uint32) {
// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
// HandleSubConnStateChange handles the state for the subconn. It finds the
// UpdateSubConnState handles the state for the subconn. It finds the
// corresponding balancer and forwards the update.
func (bg *BalancerGroup) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
bg.incomingMu.Lock()
config, ok := bg.scToSubBalancer[sc]
if !ok {
bg.incomingMu.Unlock()
return
}
if state == connectivity.Shutdown {
if state.ConnectivityState == connectivity.Shutdown {
// Only delete sc from the map when state changed to Shutdown.
delete(bg.scToSubBalancer, sc)
}
bg.incomingMu.Unlock()
bg.outgoingMu.Lock()
config.handleSubConnStateChange(sc, state)
config.updateSubConnState(sc, state)
bg.outgoingMu.Unlock()
}
// HandleResolvedAddrs handles addresses from resolver. It finds the balancer
// and forwards the update.
//
// TODO: change this to UpdateClientConnState to handle addresses and balancer
// config.
func (bg *BalancerGroup) HandleResolvedAddrs(id internal.Locality, addrs []resolver.Address) {
// UpdateClientConnState handles ClientState (including balancer config and
// addresses) from resolver. It finds the balancer and forwards the update.
func (bg *BalancerGroup) UpdateClientConnState(id internal.Locality, s balancer.ClientConnState) error {
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
if config, ok := bg.idToBalancerConfig[id]; ok {
config.updateAddrs(addrs)
return config.updateClientConnState(s)
}
bg.outgoingMu.Unlock()
return nil
}
// TODO: handleServiceConfig()
@ -515,7 +513,12 @@ func (bg *BalancerGroup) updateBalancerState(id internal.Locality, state balance
bg.logger.Warningf("balancer group: pickerState for %v not found when update picker/state", id)
return
}
pickerSt.picker = newLoadReportPicker(state.Picker, id, bg.loadStore)
newPicker := state.Picker
if bg.loadStore != nil {
// Only wrap the picker to do load reporting if loadStore was set.
newPicker = newLoadReportPicker(state.Picker, id, bg.loadStore)
}
pickerSt.picker = newPicker
pickerSt.state = state.ConnectivityState
if bg.incomingStarted {
bg.logger.Infof("Child pickers with weight: %+v", bg.idToPickerState)

View File

@ -65,12 +65,12 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
// Add one balancer to group.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
// Send one resolved address.
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
@ -82,11 +82,11 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
}
// Send two addresses.
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
// Expect one new subconn, send state update.
sc2 := <-cc.NewSubConnCh
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin pick.
p2 := <-cc.NewPickerCh
@ -96,12 +96,12 @@ func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
}
// Remove the first address.
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[1:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
bg.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
bg.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Test pick with only the second subconn.
p3 := <-cc.NewPickerCh
@ -122,18 +122,18 @@ func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[0:1])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc2 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
@ -152,24 +152,24 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
// Add two balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
@ -179,7 +179,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Turn sc2's connection down, should be RR between balancers.
bg.HandleSubConnStateChange(sc2, connectivity.TransientFailure)
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p2 := <-cc.NewPickerCh
// Expect two sc1's in the result, because balancer1 will be picked twice,
// but there's only one sc in it.
@ -189,12 +189,12 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Remove sc3's addresses.
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[3:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[3:4]}})
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove)
}
bg.HandleSubConnStateChange(scToRemove, connectivity.Shutdown)
bg.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
@ -202,7 +202,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Turn sc1's connection down.
bg.HandleSubConnStateChange(sc1, connectivity.TransientFailure)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p4 := <-cc.NewPickerCh
want = []balancer.SubConn{sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
@ -210,7 +210,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Turn last connection to connecting.
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
p5 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p5.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
@ -219,7 +219,7 @@ func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
}
// Turn all connections down.
bg.HandleSubConnStateChange(sc4, connectivity.TransientFailure)
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p6 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p6.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
@ -237,24 +237,24 @@ func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
@ -273,24 +273,24 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
// Add three balancers to group and send one resolved address to both
// balancers.
bg.Add(testBalancerIDs[0], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:1])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[1:2])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[2], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:2])
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc3 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3}
@ -311,7 +311,7 @@ func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
}
// move balancer 3 into transient failure.
bg.HandleSubConnStateChange(sc3, connectivity.TransientFailure)
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Remove the first balancer, while the third is transient failure.
bg.Remove(testBalancerIDs[0])
scToRemove = <-cc.RemoveSubConnCh
@ -335,24 +335,24 @@ func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
@ -383,28 +383,28 @@ func (s) TestBalancerGroup_LoadReport(t *testing.T) {
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
backendToBalancerID[sc1] = testBalancerIDs[0]
backendToBalancerID[sc2] = testBalancerIDs[0]
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
backendToBalancerID[sc3] = testBalancerIDs[1]
backendToBalancerID[sc4] = testBalancerIDs[1]
// Send state changes for both subconns.
bg.HandleSubConnStateChange(sc1, connectivity.Connecting)
bg.HandleSubConnStateChange(sc1, connectivity.Ready)
bg.HandleSubConnStateChange(sc2, connectivity.Connecting)
bg.HandleSubConnStateChange(sc2, connectivity.Ready)
bg.HandleSubConnStateChange(sc3, connectivity.Connecting)
bg.HandleSubConnStateChange(sc3, connectivity.Ready)
bg.HandleSubConnStateChange(sc4, connectivity.Connecting)
bg.HandleSubConnStateChange(sc4, connectivity.Ready)
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
@ -462,9 +462,9 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
@ -473,8 +473,8 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
@ -490,19 +490,19 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
bg.Close()
for i := 0; i < 4; i++ {
bg.HandleSubConnStateChange(<-cc.RemoveSubConnCh, connectivity.Shutdown)
bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
bg.Add(testBalancerIDs[2], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[2], testBackendAddrs[1:3])
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
// Remove b1.
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
bg.ChangeWeight(testBalancerIDs[1], 3)
bg.HandleResolvedAddrs(testBalancerIDs[1], append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3]))
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
bg.Start()
@ -511,8 +511,8 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m2[addrs[0]] = sc
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
@ -544,9 +544,9 @@ func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
bg := New(cc, nil, nil)
bg.Add(testBalancerIDs[0], 2, &testutils.TestConstBalancerBuilder{})
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, &testutils.TestConstBalancerBuilder{})
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
}
@ -570,9 +570,9 @@ func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.T
// Add two balancers to group and send two resolved addresses to both
// balancers.
bg.Add(testBalancerIDs[0], 2, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[0], testBackendAddrs[0:2])
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
bg.Add(testBalancerIDs[1], 1, rrBuilder)
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[2:4])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
@ -581,8 +581,8 @@ func initBalancerGroupForCachingTest(t *testing.T) (*BalancerGroup, *testutils.T
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
@ -627,7 +627,7 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
// Turn down subconn for addr2, shouldn't get picker update because
// sub-balancer1 was removed.
bg.HandleSubConnStateChange(addrToSC[testBackendAddrs[2]], connectivity.TransientFailure)
bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
for i := 0; i < 10; i++ {
select {
case <-cc.NewPickerCh:
@ -760,7 +760,7 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
}
}
bg.HandleResolvedAddrs(testBalancerIDs[1], testBackendAddrs[4:6])
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
newSCTimeout := time.After(time.Millisecond * 500)
scToAdd := map[resolver.Address]int{
@ -777,8 +777,8 @@ func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *test
scToAdd[addr[0]] = c - 1
sc := <-cc.NewSubConnCh
addrToSC[addr[0]] = sc
bg.HandleSubConnStateChange(sc, connectivity.Connecting)
bg.HandleSubConnStateChange(sc, connectivity.Ready)
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
case <-newSCTimeout:
t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
}

View File

@ -139,7 +139,9 @@ func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMe
// balancer becomes ready).
bgwc.bg.Remove(id)
bgwc.bg.Add(id, config.weight, edsImpl.subBalancerBuilder)
bgwc.bg.HandleResolvedAddrs(id, config.addrs)
bgwc.bg.UpdateClientConnState(id, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: config.addrs},
})
}
}
}
@ -313,7 +315,9 @@ func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroup
if addrsChanged {
config.addrs = newAddrs
bgwc.bg.HandleResolvedAddrs(lid, newAddrs)
bgwc.bg.UpdateClientConnState(lid, balancer.ClientConnState{
ResolverState: resolver.State{Addresses: newAddrs},
})
}
}
@ -344,7 +348,7 @@ func (edsImpl *edsBalancerImpl) HandleSubConnStateChange(sc balancer.SubConn, s
return
}
if bg := bgwc.bg; bg != nil {
bg.HandleSubConnStateChange(sc, s)
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s})
}
}