balancer/weightedtarget: use ConnectivityStateEvaluator (#5734)

This commit is contained in:
Arvind Bright 2022-10-26 11:33:49 -07:00 committed by GitHub
parent 3fd80b0c52
commit 3c09650e05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 72 deletions

View File

@ -55,7 +55,11 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne
cse.numIdle += updateVal
}
}
return cse.CurrentState()
}
// CurrentState returns the current aggregate conn state by evaluating the counters
func (cse *ConnectivityStateEvaluator) CurrentState() connectivity.State {
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready

View File

@ -57,6 +57,8 @@ type Aggregator struct {
logger *grpclog.PrefixLogger
newWRR func() wrr.WRR
csEvltr *balancer.ConnectivityStateEvaluator
mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
@ -81,11 +83,12 @@ func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr
cc: cc,
logger: logger,
newWRR: newWRR,
csEvltr: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*weightedPickerState),
}
}
// Start starts the aggregator. It can be called after Close to restart the
// Start starts the aggregator. It can be called after Stop to restart the
// aggretator.
func (wbsa *Aggregator) Start() {
wbsa.mu.Lock()
@ -93,7 +96,7 @@ func (wbsa *Aggregator) Start() {
wbsa.started = true
}
// Stop stops the aggregator. When the aggregator is closed, it won't call
// Stop stops the aggregator. When the aggregator is stopped, it won't call
// parent ClientConn to update balancer state.
func (wbsa *Aggregator) Stop() {
wbsa.mu.Lock()
@ -118,6 +121,9 @@ func (wbsa *Aggregator) Add(id string, weight uint32) {
},
stateToAggregate: connectivity.Connecting,
}
wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
wbsa.buildAndUpdateLocked()
}
// Remove removes the sub-balancer state. Future updates from this sub-balancer,
@ -128,9 +134,14 @@ func (wbsa *Aggregator) Remove(id string) {
if _, ok := wbsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get csEvltr
// to remove the previous state for any aggregated state evaluations.
// transitions to and from connectivity.Shutdown are ignored by csEvltr.
wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(wbsa.idToPickerState, id)
wbsa.buildAndUpdateLocked()
}
// UpdateWeight updates the weight for the given id. Note that this doesn't
@ -180,6 +191,9 @@ func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
// it's either removed, or never existed.
return
}
wbsa.csEvltr.RecordTransition(oldState.stateToAggregate, newState.ConnectivityState)
if !(oldState.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
@ -189,18 +203,7 @@ func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
}
oldState.state = newState
if !wbsa.started {
return
}
if wbsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
wbsa.needUpdateStateOnResume = true
return
}
wbsa.cc.UpdateState(wbsa.build())
wbsa.buildAndUpdateLocked()
}
// clearState Reset everything to init state (Connecting) but keep the entry in
@ -217,11 +220,11 @@ func (wbsa *Aggregator) clearStates() {
}
}
// BuildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (wbsa *Aggregator) BuildAndUpdate() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
// buildAndUpdateLocked aggregates the connectivity states of the sub-balancers,
// builds a new picker and sends an update to the parent ClientConn.
//
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) buildAndUpdateLocked() {
if !wbsa.started {
return
}
@ -240,48 +243,34 @@ func (wbsa *Aggregator) BuildAndUpdate() {
// Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
pickerN := len(wbsa.idToPickerState)
readyPickers := make([]weightedPickerState, 0, pickerN)
errorPickers := make([]weightedPickerState, 0, pickerN)
for _, ps := range wbsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
readyPickers = append(readyPickers, *ps)
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
case connectivity.TransientFailure:
errorPickers = append(errorPickers, *ps)
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}
// Make sure picker's return error is consistent with the aggregatedState.
var picker balancer.Picker
switch aggregatedState {
case connectivity.TransientFailure:
picker = newWeightedPickerGroup(errorPickers, wbsa.newWRR)
pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
switch aggState := wbsa.csEvltr.CurrentState(); aggState {
case connectivity.Connecting:
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
return balancer.State{
ConnectivityState: aggState,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}
case connectivity.TransientFailure:
// this means that all sub-balancers are now in TransientFailure.
for _, ps := range wbsa.idToPickerState {
pickers = append(pickers, *ps)
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
default:
picker = newWeightedPickerGroup(readyPickers, wbsa.newWRR)
for _, ps := range wbsa.idToPickerState {
if ps.stateToAggregate == connectivity.Ready {
pickers = append(pickers, *ps)
}
}
return balancer.State{
ConnectivityState: aggState,
Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
}
return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
}
type weightedPickerGroup struct {

View File

@ -88,8 +88,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
}
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
var rebuildStateAndPicker bool
b.stateAggregator.PauseStateUpdates()
defer b.stateAggregator.ResumeStateUpdates()
@ -98,9 +96,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
if _, ok := newConfig.Targets[name]; !ok {
b.stateAggregator.Remove(name)
b.bg.Remove(name)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
}
}
@ -119,21 +114,15 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
} else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name {
// If the child policy name is differet, remove from balancer group
// If the child policy name is different, remove from balancer group
// and re-add.
b.stateAggregator.Remove(name)
b.bg.Remove(name)
b.stateAggregator.Add(name, newT.Weight)
b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
} else if newT.Weight != oldT.Weight {
// If this is an existing sub-balancer, update weight if necessary.
b.stateAggregator.UpdateWeight(name, newT.Weight)
// Trigger a state/picker update, because we don't want `ClientConn`
// should do picks with the new weights now.
rebuildStateAndPicker = true
}
// Forwards all the update:
@ -154,9 +143,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
b.targets = newConfig.Targets
if rebuildStateAndPicker {
b.stateAggregator.BuildAndUpdate()
}
return nil
}

View File

@ -240,7 +240,6 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
// Don't wait for SubConns to be removed after close, because they are only
// removed after close timeout.
for i := 0; i < 10; i++ {