weightedtarget: return a more meaningful error when no child policy is reporting READY (#5391)

This commit is contained in:
Arvind Bright 2022-10-17 14:38:11 -07:00 committed by GitHub
parent bb3d739418
commit eb8aa3192b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 26 deletions

View File

@ -240,20 +240,23 @@ func (wbsa *Aggregator) BuildAndUpdate() {
// Caller must hold wbsa.mu. // Caller must hold wbsa.mu.
func (wbsa *Aggregator) build() balancer.State { func (wbsa *Aggregator) build() balancer.State {
wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
m := wbsa.idToPickerState
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state. // state.
var readyN, connectingN, idleN int var readyN, connectingN, idleN int
readyPickerWithWeights := make([]weightedPickerState, 0, len(m)) pickerN := len(wbsa.idToPickerState)
for _, ps := range m { readyPickers := make([]weightedPickerState, 0, pickerN)
errorPickers := make([]weightedPickerState, 0, pickerN)
for _, ps := range wbsa.idToPickerState {
switch ps.stateToAggregate { switch ps.stateToAggregate {
case connectivity.Ready: case connectivity.Ready:
readyN++ readyN++
readyPickerWithWeights = append(readyPickerWithWeights, *ps) readyPickers = append(readyPickers, *ps)
case connectivity.Connecting: case connectivity.Connecting:
connectingN++ connectingN++
case connectivity.Idle: case connectivity.Idle:
idleN++ idleN++
case connectivity.TransientFailure:
errorPickers = append(errorPickers, *ps)
} }
} }
var aggregatedState connectivity.State var aggregatedState connectivity.State
@ -272,11 +275,11 @@ func (wbsa *Aggregator) build() balancer.State {
var picker balancer.Picker var picker balancer.Picker
switch aggregatedState { switch aggregatedState {
case connectivity.TransientFailure: case connectivity.TransientFailure:
picker = base.NewErrPicker(balancer.ErrTransientFailure) picker = newWeightedPickerGroup(errorPickers, wbsa.newWRR)
case connectivity.Connecting: case connectivity.Connecting:
picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
default: default:
picker = newWeightedPickerGroup(readyPickerWithWeights, wbsa.newWRR) picker = newWeightedPickerGroup(readyPickers, wbsa.newWRR)
} }
return balancer.State{ConnectivityState: aggregatedState, Picker: picker} return balancer.State{ConnectivityState: aggregatedState, Picker: picker}
} }

View File

@ -20,7 +20,9 @@ package weightedtarget
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"strings"
"testing" "testing"
"time" "time"
@ -569,7 +571,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
} }
// Turn sc1's connection down. // Turn sc1's connection down.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p = <-cc.NewPickerCh p = <-cc.NewPickerCh
want = []balancer.SubConn{sc4} want = []balancer.SubConn{sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p)); err != nil { if err := testutils.IsRoundRobin(want, subConnFromPicker(p)); err != nil {
@ -586,11 +592,14 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
} }
// Turn all connections down. // Turn all connections down.
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) wtb.UpdateSubConnState(sc4, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p = <-cc.NewPickerCh p = <-cc.NewPickerCh
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) t.Fatalf("want pick error %q, got error %q", scConnErr, err)
} }
} }
} }
@ -793,7 +802,11 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
} }
// Move balancer 3 into transient failure. // Move balancer 3 into transient failure.
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc3, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
<-cc.NewPickerCh <-cc.NewPickerCh
// Remove the first balancer, while the third is transient failure. // Remove the first balancer, while the third is transient failure.
@ -827,8 +840,8 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved) t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
} }
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure { if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err) t.Fatalf("want pick error %q, got error %q", scConnErr, err)
} }
} }
} }
@ -1064,15 +1077,21 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes
// Set both subconn to TransientFailure, this will put both sub-balancers in // Set both subconn to TransientFailure, this will put both sub-balancers in
// transient failure. // transient failure.
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) scConnErr := errors.New("subConn connection error")
wtb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
<-cc.NewPickerCh <-cc.NewPickerCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) wtb.UpdateSubConnState(sc2, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
p := <-cc.NewPickerCh p := <-cc.NewPickerCh
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
r, err := p.Pick(balancer.PickInfo{}) if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
if err != balancer.ErrTransientFailure { t.Fatalf("want pick error %q, got error %q", scConnErr, err)
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
} }
} }
@ -1086,8 +1105,8 @@ func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *tes
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
r, err := p.Pick(balancer.PickInfo{}) r, err := p.Pick(balancer.PickInfo{})
if err != balancer.ErrTransientFailure { if err == nil || !strings.Contains(err.Error(), scConnErr.Error()) {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err) t.Fatalf("want pick error %q, got result %v, err %q", scConnErr, r, err)
} }
} }
} }

View File

@ -19,6 +19,7 @@ package clusterresolver
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"testing" "testing"
"time" "time"
@ -247,7 +248,11 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
} }
// Turn down 1, use 2 // Turn down 1, use 2
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) scConnErr := errors.New("subConn connection error")
edsb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
addrs2 := <-cc.NewSubConnAddrsCh addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want) t.Fatalf("sc is created with addr %v, want %v", got, want)
@ -274,7 +279,8 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
} }
// Should get an update with 1's old picker, to override 2's old picker. // Should get an update with 1's old picker, to override 2's old picker.
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil { want := errors.New("last connection error: subConn connection error")
if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -305,10 +311,15 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
} }
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
// Turn down 1, pick should error. // Turn down 1, pick should error.
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) scConnErr := errors.New("subConn connection error")
edsb.UpdateSubConnState(sc1, balancer.SubConnState{
ConnectivityState: connectivity.TransientFailure,
ConnectionError: scConnErr,
})
// Test pick failure. // Test pick failure.
if err := testErrPickerFromCh(cc.NewPickerCh, balancer.ErrTransientFailure); err != nil { want := errors.New("last connection error: subConn connection error")
if err := testErrPickerFromCh(cc.NewPickerCh, want); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -463,8 +474,8 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
// Add localities to existing priorities. // Add localities to existing priorities.
// //
// - start with 2 locality with p0 and p1 // - start with 2 locality with p0 and p1
// - add localities to existing p0 and p1 // - add localities to existing p0 and p1
func (s) TestEDSPriority_MultipleLocalities(t *testing.T) { func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup() defer cleanup()