xds/priority: bug fix and minor behavior change (#5417)

This commit is contained in:
Doug Fawley 2022-06-17 18:14:31 +00:00 committed by GitHub
parent 29d9970c51
commit 3e7b97febc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 468 additions and 326 deletions

View File

@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{}, csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config, config: bb.config,
state: connectivity.Connecting,
} }
// Initialize picker to a picker that always returns // Initialize picker to a picker that always returns
// ErrNoSubConnAvailable, because when state of a SubConn changes, we // ErrNoSubConnAvailable, because when state of a SubConn changes, we
@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.ResolverError(errors.New("produced zero addresses")) b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState return balancer.ErrBadResolverState
} }
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil return nil
} }

View File

@ -188,9 +188,9 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
} }
// WaitForPickerWithErr waits until an error picker is pushed to this // WaitForPickerWithErr waits until an error picker is pushed to this
// ClientConn with the error matching the wanted error. Also drains the // ClientConn with the error matching the wanted error. Returns an error if
// matching entry from the state channel. Returns an error if the provided // the provided context expires, including the last received picker error (if
// context expires, including the last received picker error (if any). // any).
func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error { func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
lastErr := errors.New("received no picker") lastErr := errors.New("received no picker")
for { for {
@ -198,7 +198,6 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error)
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr) return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr)
case picker := <-tcc.NewPickerCh: case picker := <-tcc.NewPickerCh:
<-tcc.NewStateCh
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() { if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() {
break break
@ -210,9 +209,8 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error)
} }
// WaitForConnectivityState waits until the state pushed to this ClientConn // WaitForConnectivityState waits until the state pushed to this ClientConn
// matches the wanted state. Also drains the matching entry from the picker // matches the wanted state. Returns an error if the provided context expires,
// channel. Returns an error if the provided context expires, including the // including the last received state (if any).
// last received state (if any).
func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error { func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
var lastState connectivity.State = -1 var lastState connectivity.State = -1
for { for {
@ -220,7 +218,6 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState) return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState)
case s := <-tcc.NewStateCh: case s := <-tcc.NewStateCh:
<-tcc.NewPickerCh
if s == want { if s == want {
return nil return nil
} }
@ -230,17 +227,22 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co
} }
// WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also // WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also
// drains the matching state channel and requires it to be READY to be // drains the matching state channel and requires it to be READY (if an entry
// considered. Returns an error if the provided context expires, including the // is pending) to be considered. Returns an error if the provided context
// last received error from IsRoundRobin or the picker (if any). // expires, including the last received error from IsRoundRobin or the picker
// (if any).
func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error { func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
lastErr := errors.New("received no picker") lastErr := errors.New("received no picker")
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr) return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr)
case s := <-tcc.NewStateCh: case p := <-tcc.NewPickerCh:
p := <-tcc.NewPickerCh s := connectivity.Ready
select {
case s = <-tcc.NewStateCh:
default:
}
if s != connectivity.Ready { if s != connectivity.Ready {
lastErr = fmt.Errorf("received state %v instead of ready", s) lastErr = fmt.Errorf("received state %v instead of ready", s)
break break
@ -250,6 +252,8 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
sc, err := p.Pick(balancer.PickInfo{}) sc, err := p.Pick(balancer.PickInfo{})
if err != nil { if err != nil {
pickerErr = err pickerErr = err
} else if sc.Done != nil {
sc.Done(balancer.DoneInfo{})
} }
return sc.SubConn return sc.SubConn
}); pickerErr != nil { }); pickerErr != nil {
@ -264,6 +268,24 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
} }
} }
// WaitForPicker waits for a picker that results in f returning nil. If the
// context expires, returns the last error returned by f (if any).
func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr)
case p := <-tcc.NewPickerCh:
if err := f(p); err != nil {
lastErr = err
continue
}
return nil
}
}
}
// IsRoundRobin checks whether f's return value is roundrobin of elements from // IsRoundRobin checks whether f's return value is roundrobin of elements from
// want. But it doesn't check for the order. Note that want can contain // want. But it doesn't check for the order. Note that want can contain
// duplicate items, which makes it weight-round-robin. // duplicate items, which makes it weight-round-robin.

View File

@ -46,7 +46,7 @@ import (
) )
const ( const (
defaultTestTimeout = 1 * time.Second defaultTestTimeout = 5 * time.Second
defaultShortTestTimeout = 100 * time.Microsecond defaultShortTestTimeout = 100 * time.Microsecond
testClusterName = "test-cluster" testClusterName = "test-cluster"
@ -90,6 +90,9 @@ func init() {
// TestDropByCategory verifies that the balancer correctly drops the picks, and // TestDropByCategory verifies that the balancer correctly drops the picks, and
// that the drops are reported. // that the drops are reported.
func (s) TestDropByCategory(t *testing.T) { func (s) TestDropByCategory(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient() xdsC := fakeclient.NewClient()
defer xdsC.Close() defer xdsC.Close()
@ -122,9 +125,6 @@ func (s) TestDropByCategory(t *testing.T) {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err) t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
} }
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx) got, err := xdsC.WaitForReportLoad(ctx)
if err != nil { if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
@ -136,33 +136,34 @@ func (s) TestDropByCategory(t *testing.T) {
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker. // This should get the connecting picker.
p0 := <-cc.NewPickerCh if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
} }
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend. // Test pick with one backend.
p1 := <-cc.NewPickerCh
const rpcCount = 20 const rpcCount = 20
for i := 0; i < rpcCount; i++ { if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
gotSCSt, err := p1.Pick(balancer.PickInfo{}) for i := 0; i < rpcCount; i++ {
// Even RPCs are dropped. gotSCSt, err := p.Pick(balancer.PickInfo{})
if i%2 == 0 { // Even RPCs are dropped.
if err == nil || !strings.Contains(err.Error(), "dropped") { if i%2 == 0 {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) if err == nil || !strings.Contains(err.Error(), "dropped") {
return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
return nil
}); err != nil {
t.Fatal(err.Error())
} }
// Dump load data from the store and compare with expected counts. // Dump load data from the store and compare with expected counts.
@ -210,22 +211,26 @@ func (s) TestDropByCategory(t *testing.T) {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err) t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
} }
p2 := <-cc.NewPickerCh if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
for i := 0; i < rpcCount; i++ { for i := 0; i < rpcCount; i++ {
gotSCSt, err := p2.Pick(balancer.PickInfo{}) gotSCSt, err := p.Pick(balancer.PickInfo{})
// Even RPCs are dropped. // Even RPCs are dropped.
if i%4 == 0 { if i%4 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") { if err == nil || !strings.Contains(err.Error(), "dropped") {
t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
return nil
}); err != nil {
t.Fatal(err.Error())
} }
const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2 const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
@ -287,51 +292,52 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker. // This should get the connecting picker.
p0 := <-cc.NewPickerCh if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
} }
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend. // Test pick with one backend.
dones := []func(){}
p1 := <-cc.NewPickerCh
const rpcCount = 100 const rpcCount = 100
for i := 0; i < rpcCount; i++ { if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
gotSCSt, err := p1.Pick(balancer.PickInfo{}) dones := []func(){}
if i < 50 && err != nil { for i := 0; i < rpcCount; i++ {
t.Errorf("The first 50%% picks should be non-drops, got error %v", err) gotSCSt, err := p.Pick(balancer.PickInfo{})
} else if i > 50 && err == nil { if i < 50 && err != nil {
t.Errorf("The second 50%% picks should be drops, got error <nil>") return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} } else if i > 50 && err == nil {
dones = append(dones, func() { return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
}) dones = append(dones, func() {
} if gotSCSt.Done != nil {
for _, done := range dones { gotSCSt.Done(balancer.DoneInfo{})
done() }
} })
}
for _, done := range dones {
done()
}
dones = []func(){} dones = []func(){}
// Pick without drops. // Pick without drops.
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{}) gotSCSt, err := p.Pick(balancer.PickInfo{})
if err != nil { if err != nil {
t.Errorf("The third 50%% picks should be non-drops, got error %v", err) t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
} }
}) dones = append(dones, func() {
} if gotSCSt.Done != nil {
for _, done := range dones { gotSCSt.Done(balancer.DoneInfo{})
done() }
})
}
for _, done := range dones {
done()
}
return nil
}); err != nil {
t.Fatal(err.Error())
} }
// Dump load data from the store and compare with expected counts. // Dump load data from the store and compare with expected counts.
@ -426,6 +432,9 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
// TestClusterNameInAddressAttributes covers the case that cluster name is // TestClusterNameInAddressAttributes covers the case that cluster name is
// attached to the subconn address attributes. // attached to the subconn address attributes.
func (s) TestClusterNameInAddressAttributes(t *testing.T) { func (s) TestClusterNameInAddressAttributes(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient() xdsC := fakeclient.NewClient()
defer xdsC.Close() defer xdsC.Close()
@ -451,12 +460,8 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker. // This should get the connecting picker.
p0 := <-cc.NewPickerCh if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
} }
addrs1 := <-cc.NewSubConnAddrsCh addrs1 := <-cc.NewSubConnAddrsCh
@ -470,16 +475,8 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend. // Test pick with one backend.
p1 := <-cc.NewPickerCh if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
const rpcCount = 20 t.Fatal(err.Error())
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
} }
const testClusterName2 = "test-cluster-2" const testClusterName2 = "test-cluster-2"
@ -511,6 +508,9 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
// TestReResolution verifies that when a SubConn turns transient failure, // TestReResolution verifies that when a SubConn turns transient failure,
// re-resolution is triggered. // re-resolution is triggered.
func (s) TestReResolution(t *testing.T) { func (s) TestReResolution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient() xdsC := fakeclient.NewClient()
defer xdsC.Close() defer xdsC.Close()
@ -536,22 +536,14 @@ func (s) TestReResolution(t *testing.T) {
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker. // This should get the connecting picker.
p0 := <-cc.NewPickerCh if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
} }
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker. // This should get the transient failure picker.
p1 := <-cc.NewPickerCh if err := cc.WaitForErrPicker(ctx); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p1.Pick(balancer.PickInfo{})
if err == nil {
t.Fatalf("picker.Pick, got _,%v, want not nil", err)
}
} }
// The transient failure should trigger a re-resolution. // The transient failure should trigger a re-resolution.
@ -563,20 +555,14 @@ func (s) TestReResolution(t *testing.T) {
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend. // Test pick with one backend.
p2 := <-cc.NewPickerCh if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
want := []balancer.SubConn{sc1} t.Fatal(err.Error())
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
} }
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker. // This should get the transient failure picker.
p3 := <-cc.NewPickerCh if err := cc.WaitForErrPicker(ctx); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p3.Pick(balancer.PickInfo{})
if err == nil {
t.Fatalf("picker.Pick, got _,%v, want not nil", err)
}
} }
// The transient failure should trigger a re-resolution. // The transient failure should trigger a re-resolution.
@ -635,32 +621,32 @@ func (s) TestLoadReporting(t *testing.T) {
sc1 := <-cc.NewSubConnCh sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker. // This should get the connecting picker.
p0 := <-cc.NewPickerCh if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
for i := 0; i < 10; i++ { t.Fatal(err.Error())
_, err := p0.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable)
}
} }
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend. // Test pick with one backend.
p1 := <-cc.NewPickerCh
const successCount = 5 const successCount = 5
for i := 0; i < successCount; i++ {
gotSCSt, err := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
const errorCount = 5 const errorCount = 5
for i := 0; i < errorCount; i++ { if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
gotSCSt, err := p1.Pick(balancer.PickInfo{}) for i := 0; i < successCount; i++ {
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { gotSCSt, err := p.Pick(balancer.PickInfo{})
t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
} }
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) for i := 0; i < errorCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}
return nil
}); err != nil {
t.Fatal(err.Error())
} }
// Dump load data from the store and compare with expected counts. // Dump load data from the store and compare with expected counts.

View File

@ -456,6 +456,9 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
} }
func (s) TestEDS_CircuitBreaking(t *testing.T) { func (s) TestEDS_CircuitBreaking(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup() defer cleanup()
@ -481,43 +484,51 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Picks with drops. // Picks with drops.
dones := []func(){} if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
p := <-cc.NewPickerCh dones := []func(){}
for i := 0; i < 100; i++ { defer func() {
pr, err := p.Pick(balancer.PickInfo{}) for _, f := range dones {
if i < 50 && err != nil { f()
t.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} else if i > 50 && err == nil {
t.Errorf("The second 50%% picks should be drops, got error <nil>")
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
} }
}) }()
}
for _, done := range dones { for i := 0; i < 100; i++ {
done() pr, err := p.Pick(balancer.PickInfo{})
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 50; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if err != nil {
t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if pr.Done != nil { if pr.Done != nil {
pr.Done(balancer.DoneInfo{}) dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
} }
})
}
// Without this, future tests with the same service name will fail. if i < 50 && err != nil {
for _, done := range dones { return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
done() } else if i > 50 && err == nil {
return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
}
}
for _, done := range dones {
done()
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 50; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if err != nil {
return fmt.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
} }
// Send another update, with only circuit breaking update (and no picker // Send another update, with only circuit breaking update (and no picker
@ -536,42 +547,48 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
} }
// Picks with drops. // Picks with drops.
dones = []func(){} if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
p2 := <-cc.NewPickerCh dones := []func(){}
for i := 0; i < 100; i++ { defer func() {
pr, err := p2.Pick(balancer.PickInfo{}) for _, f := range dones {
if i < 10 && err != nil { f()
t.Errorf("The first 10%% picks should be non-drops, got error %v", err)
} else if i > 10 && err == nil {
t.Errorf("The next 90%% picks should be drops, got error <nil>")
}
dones = append(dones, func() {
if pr.Done != nil {
pr.Done(balancer.DoneInfo{})
} }
}) }()
}
for _, done := range dones { for i := 0; i < 100; i++ {
done() pr, err := p.Pick(balancer.PickInfo{})
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 10; i++ {
pr, err := p2.Pick(balancer.PickInfo{})
if err != nil {
t.Errorf("The next 10%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if pr.Done != nil { if pr.Done != nil {
pr.Done(balancer.DoneInfo{}) dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
} }
}) if i < 10 && err != nil {
} return fmt.Errorf("The first 10%% picks should be non-drops, got error %v", err)
} else if i > 10 && err == nil {
return fmt.Errorf("The next 90%% picks should be drops, got error <nil>")
}
}
// Without this, future tests with the same service name will fail. for _, done := range dones {
for _, done := range dones { done()
done() }
dones = []func(){}
// Pick without drops.
for i := 0; i < 10; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if err != nil {
return fmt.Errorf("The next 10%% picks should be non-drops, got error %v", err)
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
} }
} }

View File

@ -69,8 +69,6 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
select { select {
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.NewSubConnCh: case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn") t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh: case <-cc.RemoveSubConnCh:
@ -78,6 +76,18 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
case <-time.After(defaultTestShortTimeout): case <-time.After(defaultTestShortTimeout):
} }
select {
case p := <-cc.NewPickerCh:
// If we do get a new picker, ensure it is still a p1 picker.
if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil {
t.Fatal(err.Error())
}
default:
// No new picker; we were previously using p1 and should still be using
// p1, so this is okay. No need to wait for defaultTestShortTimeout
// since we just waited immediately above.
}
// Remove p2, no updates. // Remove p2, no updates.
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
@ -85,14 +95,25 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
select { select {
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.NewSubConnCh: case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn") t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh: case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn") t.Fatalf("got unexpected remove SubConn")
case <-time.After(defaultTestShortTimeout): case <-time.After(defaultTestShortTimeout):
} }
select {
case p := <-cc.NewPickerCh:
// If we do get a new picker, ensure it is still a p1 picker.
if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil {
t.Fatal(err.Error())
}
default:
// No new picker; we were previously using p1 and should still be using
// p1, so this is okay. No need to wait for defaultTestShortTimeout
// since we just waited immediately above.
}
} }
// Lower priority is used when higher priority is not ready. // Lower priority is used when higher priority is not ready.
@ -147,8 +168,6 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
select { select {
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case <-cc.NewSubConnCh: case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn") t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh: case <-cc.RemoveSubConnCh:
@ -156,6 +175,18 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
case <-time.After(defaultTestShortTimeout): case <-time.After(defaultTestShortTimeout):
} }
select {
case p := <-cc.NewPickerCh:
// If we do get a new picker, ensure it is still a p1 picker.
if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil {
t.Fatal(err.Error())
}
default:
// No new picker; we were previously using p1 and should still be using
// p1, so this is okay. No need to wait for defaultTestShortTimeout
// since we just waited immediately above.
}
// Turn down 1, use 2 // Turn down 1, use 2
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
addrs2 := <-cc.NewSubConnAddrsCh addrs2 := <-cc.NewSubConnAddrsCh

View File

@ -30,6 +30,8 @@ import (
"time" "time"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancergroup" "google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpclog"
@ -53,7 +55,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b := &priorityBalancer{ b := &priorityBalancer{
cc: cc, cc: cc,
done: grpcsync.NewEvent(), done: grpcsync.NewEvent(),
childToPriority: make(map[string]int),
children: make(map[string]*childBalancer), children: make(map[string]*childBalancer),
childBalancerStateUpdate: buffer.NewUnbounded(), childBalancerStateUpdate: buffer.NewUnbounded(),
} }
@ -90,16 +91,17 @@ type priorityBalancer struct {
mu sync.Mutex mu sync.Mutex
childInUse string childInUse string
// priority of the child that's current in use. Int starting from 0, and 0
// is the higher priority.
priorityInUse int
// priorities is a list of child names from higher to lower priority. // priorities is a list of child names from higher to lower priority.
priorities []string priorities []string
// childToPriority is a map from the child name to it's priority. Priority
// is an int start from 0, and 0 is the higher priority.
childToPriority map[string]int
// children is a map from child name to sub-balancers. // children is a map from child name to sub-balancers.
children map[string]*childBalancer children map[string]*childBalancer
// Set during UpdateClientConnState when calling into sub-balancers.
// Prevents child updates from recomputing the active priority or sending
// an update of the aggregated picker to the parent. Cleared after all
// sub-balancers have finished UpdateClientConnState, after which
// syncPriority is called manually.
inhibitPickerUpdates bool
} }
func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
@ -111,7 +113,6 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
addressesSplit := hierarchy.Group(s.ResolverState.Addresses) addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
// Create and remove children, since we know all children from the config // Create and remove children, since we know all children from the config
// are used by some priority. // are used by some priority.
for name, newSubConfig := range newConfig.Children { for name, newSubConfig := range newConfig.Children {
@ -146,15 +147,14 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
} }
// Update config and address, but note that this doesn't send the // Update config and address, but note that this doesn't send the
// updates to child balancer (the child balancer might not be built, if // updates to non-started child balancers (the child balancer might not
// it's a low priority). // be built, if it's a low priority).
currentChild.updateConfig(newSubConfig, resolver.State{ currentChild.updateConfig(newSubConfig, resolver.State{
Addresses: addressesSplit[name], Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig, ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes, Attributes: s.ResolverState.Attributes,
}) })
} }
// Remove child from children if it's not in new config. // Remove child from children if it's not in new config.
for name, oldChild := range b.children { for name, oldChild := range b.children {
if _, ok := newConfig.Children[name]; !ok { if _, ok := newConfig.Children[name]; !ok {
@ -164,13 +164,32 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
// Update priorities and handle priority changes. // Update priorities and handle priority changes.
b.priorities = newConfig.Priorities b.priorities = newConfig.Priorities
b.childToPriority = make(map[string]int, len(newConfig.Priorities))
for pi, pName := range newConfig.Priorities { // Everything was removed by the update.
b.childToPriority[pName] = pi if len(b.priorities) == 0 {
b.childInUse = ""
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
})
b.mu.Unlock()
return nil
} }
// Sync the states of all children to the new updated priorities. This
// include starting/stopping child balancers when necessary. // This will sync the states of all children to the new updated
b.syncPriority(true) // priorities. Includes starting/stopping child balancers when necessary.
// Block picker updates until all children have had a chance to call
// UpdateState to prevent races where, e.g., the active priority reports
// transient failure but a higher priority may have reported something that
// made it active, and if the transient failure update is handled first,
// RPCs could fail.
b.inhibitPickerUpdates = true
// Add an item to queue to notify us when the current items in the queue
// are done and syncPriority has been called.
done := make(chan struct{})
b.childBalancerStateUpdate.Put(resumePickerUpdates{done: done})
b.mu.Unlock()
<-done
return nil return nil
} }
@ -206,7 +225,7 @@ func (b *priorityBalancer) ExitIdle() {
// UpdateState implements balancergroup.BalancerStateAggregator interface. The // UpdateState implements balancergroup.BalancerStateAggregator interface. The
// balancer group sends new connectivity state and picker here. // balancer group sends new connectivity state and picker here.
func (b *priorityBalancer) UpdateState(childName string, state balancer.State) { func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
b.childBalancerStateUpdate.Put(&childBalancerState{ b.childBalancerStateUpdate.Put(childBalancerState{
name: childName, name: childName,
s: state, s: state,
}) })
@ -217,6 +236,10 @@ type childBalancerState struct {
s balancer.State s balancer.State
} }
type resumePickerUpdates struct {
done chan struct{}
}
// run handles child update in a separate goroutine, so if the child sends // run handles child update in a separate goroutine, so if the child sends
// updates inline (when called by parent), it won't cause deadlocks (by trying // updates inline (when called by parent), it won't cause deadlocks (by trying
// to hold the same mutex). // to hold the same mutex).
@ -225,11 +248,22 @@ func (b *priorityBalancer) run() {
select { select {
case u := <-b.childBalancerStateUpdate.Get(): case u := <-b.childBalancerStateUpdate.Get():
b.childBalancerStateUpdate.Load() b.childBalancerStateUpdate.Load()
s := u.(*childBalancerState)
// Needs to handle state update in a goroutine, because each state // Needs to handle state update in a goroutine, because each state
// update needs to start/close child policy, could result in // update needs to start/close child policy, could result in
// deadlock. // deadlock.
b.handleChildStateUpdate(s.name, s.s) b.mu.Lock()
if b.done.HasFired() {
return
}
switch s := u.(type) {
case childBalancerState:
b.handleChildStateUpdate(s.name, s.s)
case resumePickerUpdates:
b.inhibitPickerUpdates = false
b.syncPriority("")
close(s.done)
}
b.mu.Unlock()
case <-b.done.Done(): case <-b.done.Done():
return return
} }

View File

@ -44,7 +44,8 @@ type childBalancer struct {
// will be restarted if the child has not reported TF more recently than it // will be restarted if the child has not reported TF more recently than it
// reported Ready or Idle. // reported Ready or Idle.
reportedTF bool reportedTF bool
state balancer.State // The latest state the child balancer provided.
state balancer.State
// The timer to give a priority some time to connect. And if the priority // The timer to give a priority some time to connect. And if the priority
// doesn't go into Ready/Failure, the next priority will be started. // doesn't go into Ready/Failure, the next priority will be started.
initTimer *timerWrapper initTimer *timerWrapper
@ -74,11 +75,14 @@ func (cb *childBalancer) updateBuilder(bb balancer.Builder) {
} }
// updateConfig sets childBalancer's config and state, but doesn't send update to // updateConfig sets childBalancer's config and state, but doesn't send update to
// the child balancer. // the child balancer unless it is started.
func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) { func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) {
cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests
cb.config = child.Config.Config cb.config = child.Config.Config
cb.rState = rState cb.rState = rState
if cb.started {
cb.sendUpdate()
}
} }
// start builds the child balancer if it's not already started. // start builds the child balancer if it's not already started.
@ -91,6 +95,7 @@ func (cb *childBalancer) start() {
cb.started = true cb.started = true
cb.parent.bg.Add(cb.name, cb.bb) cb.parent.bg.Add(cb.name, cb.bb)
cb.startInitTimer() cb.startInitTimer()
cb.sendUpdate()
} }
// sendUpdate sends the addresses and config to the child balancer. // sendUpdate sends the addresses and config to the child balancer.
@ -145,7 +150,7 @@ func (cb *childBalancer) startInitTimer() {
// Re-sync the priority. This will switch to the next priority if // Re-sync the priority. This will switch to the next priority if
// there's any. Note that it's important sync() is called after setting // there's any. Note that it's important sync() is called after setting
// initTimer to nil. // initTimer to nil.
cb.parent.syncPriority(false) cb.parent.syncPriority("")
}) })
} }

View File

@ -23,7 +23,6 @@ import (
"time" "time"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
) )
@ -59,7 +58,7 @@ var (
// - If balancer is Connecting and has non-nil initTimer (meaning it // - If balancer is Connecting and has non-nil initTimer (meaning it
// transitioned from Ready or Idle to connecting, not from TF, so we // transitioned from Ready or Idle to connecting, not from TF, so we
// should give it init-time to connect). // should give it init-time to connect).
// - If balancer is READY // - If balancer is READY or IDLE
// - If this is the lowest priority // - If this is the lowest priority
// - do the following: // - do the following:
// - if this is not the old childInUse, override picker so old picker is no // - if this is not the old childInUse, override picker so old picker is no
@ -68,18 +67,10 @@ var (
// - forward the new addresses and config // - forward the new addresses and config
// //
// Caller must hold b.mu. // Caller must hold b.mu.
func (b *priorityBalancer) syncPriority(forceUpdate bool) { func (b *priorityBalancer) syncPriority(childUpdating string) {
// Everything was removed by the update. if b.inhibitPickerUpdates {
if len(b.priorities) == 0 {
b.childInUse = ""
b.priorityInUse = 0
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
})
return return
} }
for p, name := range b.priorities { for p, name := range b.priorities {
child, ok := b.children[name] child, ok := b.children[name]
if !ok { if !ok {
@ -92,23 +83,14 @@ func (b *priorityBalancer) syncPriority(forceUpdate bool) {
child.state.ConnectivityState == connectivity.Idle || child.state.ConnectivityState == connectivity.Idle ||
(child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) || (child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) ||
p == len(b.priorities)-1 { p == len(b.priorities)-1 {
if b.childInUse != "" && b.childInUse != child.name { if b.childInUse != child.name || child.name == childUpdating {
// childInUse was set and is different from this child, will logger.Warningf("ciu, cn, cu: %v, %v, %v", b.childInUse, child.name, childUpdating)
// change childInUse later. We need to update picker here // If we switch children or the child in use just updated its
// immediately so parent stops using the old picker. // picker, push the child's picker to the parent.
b.cc.UpdateState(child.state) b.cc.UpdateState(child.state)
} }
b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p) b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p)
oldChildInUse := b.childInUse
b.switchToChild(child, p) b.switchToChild(child, p)
if b.childInUse != oldChildInUse || forceUpdate {
// If child is switched, send the update to the new child.
//
// Or if forceUpdate is true (when this is triggered by a
// ClientConn update), because the ClientConn update might
// contain changes for this child.
child.sendUpdate()
}
break break
} }
} }
@ -163,7 +145,6 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
return return
} }
b.childInUse = child.name b.childInUse = child.name
b.priorityInUse = priority
if !child.started { if !child.started {
child.start() child.start()
@ -173,40 +154,13 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
// handleChildStateUpdate start/close priorities based on the connectivity // handleChildStateUpdate start/close priorities based on the connectivity
// state. // state.
func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) { func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done.HasFired() {
return
}
priority, ok := b.childToPriority[childName]
if !ok {
b.logger.Warningf("priority: received picker update with unknown child %v", childName)
return
}
if b.childInUse == "" {
b.logger.Warningf("priority: no child is in use when picker update is received")
return
}
// priorityInUse is higher than this priority.
if b.priorityInUse < priority {
// Lower priorities should all be closed, this is an unexpected update.
// Can happen if the child policy sends an update after we tell it to
// close.
b.logger.Warningf("priority: received picker update from priority %v, lower than priority in use %v", priority, b.priorityInUse)
return
}
// Update state in child. The updated picker will be sent to parent later if // Update state in child. The updated picker will be sent to parent later if
// necessary. // necessary.
child, ok := b.children[childName] child, ok := b.children[childName]
if !ok { if !ok {
b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority) b.logger.Warningf("priority: child balancer not found for child %v", childName)
return return
} }
oldChildState := child.state
child.state = s child.state = s
// We start/stop the init timer of this child based on the new connectivity // We start/stop the init timer of this child based on the new connectivity
@ -227,36 +181,5 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
// New state is Shutdown, should never happen. Don't forward. // New state is Shutdown, should never happen. Don't forward.
} }
oldPriorityInUse := b.priorityInUse child.parent.syncPriority(childName)
child.parent.syncPriority(false)
// If child is switched by syncPriority(), it also sends the update from the
// new child to overwrite the old picker used by the parent.
//
// But no update is sent if the child is not switches. That means if this
// update is from childInUse, and this child is still childInUse after
// syncing, the update being handled here is not sent to the parent. In that
// case, we need to do an explicit check here to forward the update.
if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority {
// Special handling for Connecting. If child was not switched, and this
// is a Connecting->Connecting transition, do not send the redundant
// update, since all Connecting pickers are the same (they tell the RPCs
// to repick).
//
// This can happen because the initial state of a child (before any
// update is received) is Connecting. When the child is started, it's
// picker is sent to the parent by syncPriority (to overwrite the old
// picker if there's any). When it reports Connecting after being
// started, it will send a Connecting update (handled here), causing a
// Connecting->Connecting transition.
if oldChildState.ConnectivityState == connectivity.Connecting && s.ConnectivityState == connectivity.Connecting {
return
}
// Only forward this update if sync() didn't switch child, and this
// child is in use.
//
// sync() forwards the update if the child was switched, so there's no
// need to forward again.
b.cc.UpdateState(child.state)
}
} }

View File

@ -149,8 +149,6 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) {
} }
select { select {
case <-cc.NewPickerCh:
t.Fatalf("got unexpected new picker")
case sc := <-cc.NewSubConnCh: case sc := <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn: %s", sc) t.Fatalf("got unexpected new SubConn: %s", sc)
case sc := <-cc.RemoveSubConnCh: case sc := <-cc.RemoveSubConnCh:
@ -1886,3 +1884,125 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) {
t.Fatalf("got unexpected call to NewSubConn with addr: %v, want %v", addrsNew, want) t.Fatalf("got unexpected call to NewSubConn with addr: %v, want %v", addrsNew, want)
} }
} }
// Lower priority is used when higher priority is not ready; higher priority
// still gets updates.
//
// Init 0 and 1; 0 is down, 1 is up, use 1; update 0; 0 is up, use 0
func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewTestClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()
t.Log("Two localities, with priorities [0, 1], each with one backend.")
if err := pb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}),
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}),
},
},
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}},
"child-1": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}},
},
Priorities: []string{"child-0", "child-1"},
},
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testBackendAddrStrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.NewSubConnCh
t.Log("Make p0 fail.")
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs
// will retry.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
t.Log("Make p1 ready.")
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testBackendAddrStrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 1.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}
pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// Does not change the aggregate state, because round robin does not leave
// TRANIENT_FAILURE if a subconn goes CONNECTING.
pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}
t.Log("Change p0 to use new address.")
if err := pb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}),
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[3]}, []string{"child-1"}),
},
},
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}},
"child-1": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}},
},
Priorities: []string{"child-0", "child-1"},
},
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Two new subconns are created by the previous update; one by p0 and one
// by p1. They don't happen concurrently, but they could happen in any
// order.
t.Log("Make p0 and p1 both ready; p0 should be used.")
var sc2, sc3 balancer.SubConn
for i := 0; i < 2; i++ {
addr := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
switch addr[0].Addr {
case testBackendAddrStrs[2]:
sc2 = sc
case testBackendAddrStrs[3]:
sc3 = sc
default:
t.Fatalf("sc is created with addr %v, want %v or %v", addr[0].Addr, testBackendAddrStrs[2], testBackendAddrStrs[3])
}
pb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
pb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
if sc2 == nil {
t.Fatalf("sc not created with addr %v", testBackendAddrStrs[2])
}
if sc3 == nil {
t.Fatalf("sc not created with addr %v", testBackendAddrStrs[3])
}
// Test pick with 0.
if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil {
t.Fatal(err.Error())
}
}