grpc-go/xds/internal/balancer/clusterimpl/balancer_test.go

788 lines
26 KiB
Go

/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterimpl
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)
const (
defaultTestTimeout = 5 * time.Second
defaultShortTestTimeout = 100 * time.Microsecond
testClusterName = "test-cluster"
testServiceName = "test-eds-service"
)
var (
testBackendAddrs = []resolver.Address{
{Addr: "1.1.1.1:1"},
}
testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
}
cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
}
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func init() {
NewRandomWRR = testutils.NewTestWRR
}
// TestDropByCategory verifies that the balancer correctly drops the picks, and
// that the drops are reported.
func (s) TestDropByCategory(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
const (
dropReason = "test-dropping-category"
dropNumerator = 1
dropDenominator = 2
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason,
RequestsPerMillion: million * dropNumerator / dropDenominator,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const rpcCount = 20
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%2 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
const dropCount = rpcCount * dropNumerator / dropDenominator
wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount,
Drops: map[string]uint64{dropReason: dropCount},
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
},
}}
gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
// Send an update with new drop configs.
const (
dropReason2 = "test-dropping-category-2"
dropNumerator2 = 1
dropDenominator2 = 4
)
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
DropCategories: []DropConfig{{
Category: dropReason2,
RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
}},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
// Even RPCs are dropped.
if i%4 == 0 {
if err == nil || !strings.Contains(err.Error(), "dropped") {
return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
}
continue
}
if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
wantStatsData1 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount2,
Drops: map[string]uint64{dropReason2: dropCount2},
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
},
}}
gotStatsData1 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
}
// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
// due to circuit breaking, and that the drops are reported.
func (s) TestDropCircuitBreaking(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
var maxRequest uint32 = 50
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
MaxConcurrentRequests: &maxRequest,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const rpcCount = 100
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
dones := []func(){}
for i := 0; i < rpcCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
if i < 50 && err != nil {
return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} else if i > 50 && err == nil {
return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
}
dones = append(dones, func() {
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
})
}
for _, done := range dones {
done()
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 50; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
if err != nil {
t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
dones = append(dones, func() {
if gotSCSt.Done != nil {
gotSCSt.Done(balancer.DoneInfo{})
}
})
}
for _, done := range dones {
done()
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: uint64(maxRequest),
LocalityStats: map[string]load.LocalityData{
assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
},
}}
gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
}
}
// TestPickerUpdateAfterClose covers the case where a child policy sends a
// picker update after the cluster_impl policy is closed. Because picker updates
// are handled in the run() goroutine, which exits before Close() returns, we
// expect the above picker update to be dropped.
func (s) TestPickerUpdateAfterClose(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
// Create a stub balancer which waits for the cluster_impl policy to be
// closed before sending a picker update (upon receipt of a subConn state
// change).
closeCh := make(chan struct{})
const childPolicyName = "stubBalancer-TestPickerUpdateAfterClose"
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
// Create a subConn which will be used later on to test the race
// between UpdateSubConnState() and Close().
bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
go func() {
// Wait for Close() to be called on the parent policy before
// sending the picker update.
<-closeCh
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
}()
},
})
var maxRequest uint32 = 50
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
MaxConcurrentRequests: &maxRequest,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: childPolicyName,
},
},
}); err != nil {
b.Close()
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
// Send a subConn state change to trigger a picker update. The stub balancer
// that we use as the child policy will not send a picker update until the
// parent policy is closed.
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.Close()
close(closeCh)
select {
case <-cc.NewPickerCh:
t.Fatalf("unexpected picker update after balancer is closed")
case <-time.After(defaultShortTestTimeout):
}
}
// TestClusterNameInAddressAttributes covers the case that cluster name is
// attached to the subconn address attributes.
func (s) TestClusterNameInAddressAttributes(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
cn, ok := internal.GetXDSHandshakeClusterName(addrs1[0].Attributes)
if !ok || cn != testClusterName {
t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}
const testClusterName2 = "test-cluster-2"
var addr2 = resolver.Address{Addr: "2.2.2.2"}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName2,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, addr2.Addr; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
// New addresses should have the new cluster name.
cn2, ok := internal.GetXDSHandshakeClusterName(addrs2[0].Attributes)
if !ok || cn2 != testClusterName2 {
t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2)
}
}
// TestReResolution verifies that when a SubConn turns transient failure,
// re-resolution is triggered.
func (s) TestReResolution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
}
// The transient failure should trigger a re-resolution.
select {
case <-cc.ResolveNowCh:
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for ResolveNow()")
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
t.Fatal(err.Error())
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This should get the transient failure picker.
if err := cc.WaitForErrPicker(ctx); err != nil {
t.Fatal(err.Error())
}
// The transient failure should trigger a re-resolution.
select {
case <-cc.ResolveNowCh:
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for ResolveNow()")
}
}
func (s) TestLoadReporting(t *testing.T) {
var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
addrs := make([]resolver.Address, len(testBackendAddrs))
for i, a := range testBackendAddrs {
addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
// Locality: testLocality,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}
sc1 := <-cc.NewSubConnCh
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
// This should get the connecting picker.
if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
t.Fatal(err.Error())
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
const successCount = 5
const errorCount = 5
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
for i := 0; i < successCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{})
}
for i := 0; i < errorCount; i++ {
gotSCSt, err := p.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
}
gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
// Dump load data from the store and compare with expected counts.
loadStore := xdsC.LoadStore()
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
sds := loadStore.Stats([]string{testClusterName})
if len(sds) == 0 {
t.Fatalf("loads for cluster %v not found in store", testClusterName)
}
sd := sds[0]
if sd.Cluster != testClusterName || sd.Service != testServiceName {
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
}
testLocalityJSON, _ := testLocality.ToString()
localityData, ok := sd.LocalityStats[testLocalityJSON]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
reqStats := localityData.RequestStats
if reqStats.Succeeded != successCount {
t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
}
if reqStats.Errored != errorCount {
t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
}
if reqStats.InProgress != 0 {
t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
}
b.Close()
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}
}
// TestUpdateLRSServer covers the cases
// - the init config specifies "" as the LRS server
// - config modifies LRS server to a different string
// - config sets LRS server to nil to stop load reporting
func (s) TestUpdateLRSServer(t *testing.T) {
var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()
addrs := make([]resolver.Address, len(testBackendAddrs))
for i, a := range testBackendAddrs {
addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
got, err := xdsC.WaitForReportLoad(ctx)
if err != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
}
if got.Server != testLRSServerConfig {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
}
testLRSServerConfig2 := &bootstrap.ServerConfig{
ServerURI: "trafficdirector-another.googleapis.com:443",
CredsType: "google_default",
}
// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig2,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}
got2, err2 := xdsC.WaitForReportLoad(ctx)
if err2 != nil {
t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
}
if got2.Server != testLRSServerConfig2 {
t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
}
// Update LRS server to nil, to disable LRS.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
}
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
}
shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
defer shortCancel()
if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
t.Fatalf("unexpected load report to server: %q", s)
}
}
func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
panic(err.Error())
}
return s
}