diff --git a/balancer/conn_state_evaluator.go b/balancer/conn_state_evaluator.go index 33ea9b582..c33413581 100644 --- a/balancer/conn_state_evaluator.go +++ b/balancer/conn_state_evaluator.go @@ -55,7 +55,11 @@ func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState conne cse.numIdle += updateVal } } + return cse.CurrentState() +} +// CurrentState returns the current aggregate conn state by evaluating the counters +func (cse *ConnectivityStateEvaluator) CurrentState() connectivity.State { // Evaluate. if cse.numReady > 0 { return connectivity.Ready diff --git a/balancer/weightedtarget/weightedaggregator/aggregator.go b/balancer/weightedtarget/weightedaggregator/aggregator.go index dffc539b8..176d3ec0c 100644 --- a/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -57,6 +57,8 @@ type Aggregator struct { logger *grpclog.PrefixLogger newWRR func() wrr.WRR + csEvltr *balancer.ConnectivityStateEvaluator + mu sync.Mutex // If started is false, no updates should be sent to the parent cc. A closed // sub-balancer could still send pickers to this aggregator. This makes sure @@ -81,11 +83,12 @@ func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr cc: cc, logger: logger, newWRR: newWRR, + csEvltr: &balancer.ConnectivityStateEvaluator{}, idToPickerState: make(map[string]*weightedPickerState), } } -// Start starts the aggregator. It can be called after Close to restart the +// Start starts the aggregator. It can be called after Stop to restart the // aggretator. func (wbsa *Aggregator) Start() { wbsa.mu.Lock() @@ -93,7 +96,7 @@ func (wbsa *Aggregator) Start() { wbsa.started = true } -// Stop stops the aggregator. When the aggregator is closed, it won't call +// Stop stops the aggregator. When the aggregator is stopped, it won't call // parent ClientConn to update balancer state. func (wbsa *Aggregator) Stop() { wbsa.mu.Lock() @@ -118,6 +121,9 @@ func (wbsa *Aggregator) Add(id string, weight uint32) { }, stateToAggregate: connectivity.Connecting, } + wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting) + + wbsa.buildAndUpdateLocked() } // Remove removes the sub-balancer state. Future updates from this sub-balancer, @@ -128,9 +134,14 @@ func (wbsa *Aggregator) Remove(id string) { if _, ok := wbsa.idToPickerState[id]; !ok { return } + // Setting the state of the deleted sub-balancer to Shutdown will get csEvltr + // to remove the previous state for any aggregated state evaluations. + // transitions to and from connectivity.Shutdown are ignored by csEvltr. + wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown) // Remove id and picker from picker map. This also results in future updates // for this ID to be ignored. delete(wbsa.idToPickerState, id) + wbsa.buildAndUpdateLocked() } // UpdateWeight updates the weight for the given id. Note that this doesn't @@ -180,6 +191,9 @@ func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) { // it's either removed, or never existed. return } + + wbsa.csEvltr.RecordTransition(oldState.stateToAggregate, newState.ConnectivityState) + if !(oldState.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) { // If old state is TransientFailure, and new state is Connecting, don't // update the state, to prevent the aggregated state from being always @@ -189,18 +203,7 @@ func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) { } oldState.state = newState - if !wbsa.started { - return - } - - if wbsa.pauseUpdateState { - // If updates are paused, do not call UpdateState, but remember that we - // need to call it when they are resumed. - wbsa.needUpdateStateOnResume = true - return - } - - wbsa.cc.UpdateState(wbsa.build()) + wbsa.buildAndUpdateLocked() } // clearState Reset everything to init state (Connecting) but keep the entry in @@ -217,11 +220,11 @@ func (wbsa *Aggregator) clearStates() { } } -// BuildAndUpdate combines the sub-state from each sub-balancer into one state, -// and update it to parent ClientConn. -func (wbsa *Aggregator) BuildAndUpdate() { - wbsa.mu.Lock() - defer wbsa.mu.Unlock() +// buildAndUpdateLocked aggregates the connectivity states of the sub-balancers, +// builds a new picker and sends an update to the parent ClientConn. +// +// Caller must hold wbsa.mu. +func (wbsa *Aggregator) buildAndUpdateLocked() { if !wbsa.started { return } @@ -240,48 +243,34 @@ func (wbsa *Aggregator) BuildAndUpdate() { // Caller must hold wbsa.mu. func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) - // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated - // state. - var readyN, connectingN, idleN int - pickerN := len(wbsa.idToPickerState) - readyPickers := make([]weightedPickerState, 0, pickerN) - errorPickers := make([]weightedPickerState, 0, pickerN) - for _, ps := range wbsa.idToPickerState { - switch ps.stateToAggregate { - case connectivity.Ready: - readyN++ - readyPickers = append(readyPickers, *ps) - case connectivity.Connecting: - connectingN++ - case connectivity.Idle: - idleN++ - case connectivity.TransientFailure: - errorPickers = append(errorPickers, *ps) - } - } - var aggregatedState connectivity.State - switch { - case readyN > 0: - aggregatedState = connectivity.Ready - case connectingN > 0: - aggregatedState = connectivity.Connecting - case idleN > 0: - aggregatedState = connectivity.Idle - default: - aggregatedState = connectivity.TransientFailure - } // Make sure picker's return error is consistent with the aggregatedState. - var picker balancer.Picker - switch aggregatedState { - case connectivity.TransientFailure: - picker = newWeightedPickerGroup(errorPickers, wbsa.newWRR) + pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState)) + + switch aggState := wbsa.csEvltr.CurrentState(); aggState { case connectivity.Connecting: - picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) + return balancer.State{ + ConnectivityState: aggState, + Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)} + case connectivity.TransientFailure: + // this means that all sub-balancers are now in TransientFailure. + for _, ps := range wbsa.idToPickerState { + pickers = append(pickers, *ps) + } + return balancer.State{ + ConnectivityState: aggState, + Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)} default: - picker = newWeightedPickerGroup(readyPickers, wbsa.newWRR) + for _, ps := range wbsa.idToPickerState { + if ps.stateToAggregate == connectivity.Ready { + pickers = append(pickers, *ps) + } + } + return balancer.State{ + ConnectivityState: aggState, + Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)} } - return balancer.State{ConnectivityState: aggregatedState, Picker: picker} + } type weightedPickerGroup struct { diff --git a/balancer/weightedtarget/weightedtarget.go b/balancer/weightedtarget/weightedtarget.go index 2582c84c5..83bb7d701 100644 --- a/balancer/weightedtarget/weightedtarget.go +++ b/balancer/weightedtarget/weightedtarget.go @@ -88,8 +88,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat } addressesSplit := hierarchy.Group(s.ResolverState.Addresses) - var rebuildStateAndPicker bool - b.stateAggregator.PauseStateUpdates() defer b.stateAggregator.ResumeStateUpdates() @@ -98,9 +96,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat if _, ok := newConfig.Targets[name]; !ok { b.stateAggregator.Remove(name) b.bg.Remove(name) - // Trigger a state/picker update, because we don't want `ClientConn` - // to pick this sub-balancer anymore. - rebuildStateAndPicker = true } } @@ -119,21 +114,15 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat // Not trigger a state/picker update. Wait for the new sub-balancer // to send its updates. } else if newT.ChildPolicy.Name != oldT.ChildPolicy.Name { - // If the child policy name is differet, remove from balancer group + // If the child policy name is different, remove from balancer group // and re-add. b.stateAggregator.Remove(name) b.bg.Remove(name) b.stateAggregator.Add(name, newT.Weight) b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) - // Trigger a state/picker update, because we don't want `ClientConn` - // to pick this sub-balancer anymore. - rebuildStateAndPicker = true } else if newT.Weight != oldT.Weight { // If this is an existing sub-balancer, update weight if necessary. b.stateAggregator.UpdateWeight(name, newT.Weight) - // Trigger a state/picker update, because we don't want `ClientConn` - // should do picks with the new weights now. - rebuildStateAndPicker = true } // Forwards all the update: @@ -154,9 +143,6 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat b.targets = newConfig.Targets - if rebuildStateAndPicker { - b.stateAggregator.BuildAndUpdate() - } return nil } diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index 1228f85ad..90c5c20f4 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -240,7 +240,6 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat gator.Remove(testBalancerIDs[1]) bg.Remove(testBalancerIDs[1]) - gator.BuildAndUpdate() // Don't wait for SubConns to be removed after close, because they are only // removed after close timeout. for i := 0; i < 10; i++ {