xds: move balancergroup and weightedtarget our of xds directory (#4966)

This commit is contained in:
Easwar Swaminathan 2021-11-09 11:59:10 -08:00 committed by GitHub
parent 14ebd917f2
commit 714ba8d517
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1867 additions and 1372 deletions

View File

@ -17,6 +17,8 @@
*/
// Package weightedtarget implements the weighted_target balancer.
//
// All APIs in this package are experimental.
package weightedtarget
import (
@ -24,14 +26,14 @@ import (
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
)
// Name is the name of the weighted_target balancer.
@ -69,11 +71,6 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
type weightedTargetBalancer struct {
logger *grpclog.PrefixLogger
// TODO: Make this package not dependent on any xds specific code.
// BalancerGroup uses xdsinternal.LocalityID as the key in the map of child
// policies that it maintains and reports load using LRS. Once these two
// dependencies are removed from the balancerGroup, this package will not
// have any dependencies on xds code.
bg *balancergroup.BalancerGroup
stateAggregator *weightedaggregator.Aggregator

View File

@ -23,34 +23,38 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/grpclb"
"google.golang.org/grpc/balancer/roundrobin"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
)
const (
testJSONConfig = `{
"targets": {
"cluster_1" : {
"weight":75,
"childPolicy":[{"priority_experimental":{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}}]
"cluster_1": {
"weight": 75,
"childPolicy": [{
"grpclb": {
"childPolicy": [{"pick_first":{}}],
"targetName": "foo-service"
}
}]
},
"cluster_2" : {
"weight":25,
"childPolicy":[{"priority_experimental":{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}}]
"cluster_2": {
"weight": 25,
"childPolicy": [{"round_robin": ""}]
}
}
}`
)
var (
testConfigParser = balancer.Get(priority.Name).(balancer.ConfigParser)
testConfigJSON1 = `{"priorities": ["child-1"], "children": {"child-1": {"config": [{"round_robin":{}}]}}}`
testConfig1, _ = testConfigParser.ParseConfig([]byte(testConfigJSON1))
testConfigJSON2 = `{"priorities": ["child-2"], "children": {"child-2": {"config": [{"round_robin":{}}]}}}`
testConfig2, _ = testConfigParser.ParseConfig([]byte(testConfigJSON2))
grpclbConfigParser = balancer.Get("grpclb").(balancer.ConfigParser)
grpclbConfigJSON = `{"childPolicy": [{"pick_first":{}}], "targetName": "foo-service"}`
grpclbConfig, _ = grpclbConfigParser.ParseConfig([]byte(grpclbConfigJSON))
)
func Test_parseConfig(t *testing.T) {
func (s) TestParseConfig(t *testing.T) {
tests := []struct {
name string
js string
@ -71,15 +75,14 @@ func Test_parseConfig(t *testing.T) {
"cluster_1": {
Weight: 75,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: priority.Name,
Config: testConfig1,
Name: "grpclb",
Config: grpclbConfig,
},
},
"cluster_2": {
Weight: 25,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: priority.Name,
Config: testConfig2,
Name: roundrobin.Name,
},
},
},
@ -91,8 +94,7 @@ func Test_parseConfig(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
t.Fatalf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if !cmp.Equal(got, tt.want) {
t.Errorf("parseConfig() got unexpected result, diff: %v", cmp.Diff(got, tt.want))

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,511 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package balancergroup
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
)
var (
rrBuilder = balancer.Get(roundrobin.Name)
testBalancerIDs = []string{"b1", "b2", "b3"}
testBackendAddrs []resolver.Address
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)})
}
// Disable caching for all tests. It will be re-enabled in caching specific
// tests.
DefaultSubBalancerCloseTimeout = time.Millisecond
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
// Create a new balancer group, add balancer and backends, but not start.
// - b1, weight 2, backends [0,1]
// - b2, weight 1, backends [2,3]
// Start the balancer group and check behavior.
//
// Close the balancer group, call add/remove/change weight/change address.
// - b2, weight 3, backends [0,3]
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
gator.Stop()
bg.Close()
for i := 0; i < 4; i++ {
bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
// Remove b1.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
gator.UpdateWeight(testBalancerIDs[1], 3)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
gator.Start()
bg.Start()
m2 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m2[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// Test that balancer group start() doesn't deadlock if the balancer calls back
// into balancer group inline when it gets an update.
//
// The potential deadlock can happen if we
// - hold a lock and send updates to balancer (e.g. update resolved addresses)
// - the balancer calls back (NewSubConn or update picker) in line
// The callback will try to hold hte same lock again, which will cause a
// deadlock.
//
// This test starts the balancer group with a test balancer, will updates picker
// whenever it gets an address update. It's expected that start() doesn't block
// because of deadlock.
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], builder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], builder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
}
func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
old := DefaultSubBalancerCloseTimeout
DefaultSubBalancerCloseTimeout = n
return func() { DefaultSubBalancerCloseTimeout = old }
}
// initBalancerGroupForCachingTest creates a balancer group, and initialize it
// to be ready for caching tests.
//
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
// Don't wait for SubConns to be removed after close, because they are only
// removed after close timeout.
for i := 0; i < 10; i++ {
select {
case <-cc.RemoveSubConnCh:
t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond)
}
// Test roundrobin on the with only sub-balancer0.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
return gator, bg, cc, m1
}
// Test that if a sub-balancer is removed, and re-added within close timeout,
// the subConns won't be re-created.
func (s) TestBalancerGroup_locality_caching(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Turn down subconn for addr2, shouldn't get picker update because
// sub-balancer1 was removed.
bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
for i := 0; i < 10; i++ {
select {
case <-cc.NewPickerCh:
t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
default:
}
time.Sleep(time.Millisecond)
}
// Sleep, but sleep less then close timeout.
time.Sleep(time.Millisecond * 100)
// Re-add sub-balancer-1, because subconns were in cache, no new subconns
// should be created. But a new picker will still be generated, with subconn
// states update to date.
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
// addr2 is down, b2 only has addr3 in READY state.
addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
for i := 0; i < 10; i++ {
select {
case <-cc.NewSubConnAddrsCh:
t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond * 10)
}
}
// Sub-balancers are put in cache when they are removed. If balancer group is
// closed within close timeout, all subconns should still be rmeoved
// immediately.
func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
_, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
bg.Close()
// The balancer group is closed. The subconns should be removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[0]]: 1,
addrToSC[testBackendAddrs[1]]: 1,
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
}
// Sub-balancers in cache will be closed if not re-added within timeout, and
// subConns will be removed.
func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
_, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
// The sub-balancer is not re-added within timeout. The subconns should be
// removed.
removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
}
// Wrap the rr builder, so it behaves the same, but has a different pointer.
type noopBalancerBuilderWrapper struct {
balancer.Builder
}
// After removing a sub-balancer, re-add with same ID, but different balancer
// builder. Old subconns should be removed, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Re-add sub-balancer-1, but with a different balancer builder. The
// sub-balancer was still in cache, but cann't be reused. This should cause
// old sub-balancer's subconns to be removed immediately, and new subconns
// to be created.
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
// The cached sub-balancer should be closed, and the subconns should be
// removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
newSCTimeout := time.After(time.Millisecond * 500)
scToAdd := map[resolver.Address]int{
testBackendAddrs[4]: 1,
testBackendAddrs[5]: 1,
}
for i := 0; i < len(scToAdd); i++ {
select {
case addr := <-cc.NewSubConnAddrsCh:
c := scToAdd[addr[0]]
if c == 0 {
t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
}
scToAdd[addr[0]] = c - 1
sc := <-cc.NewSubConnCh
addrToSC[addr[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
case <-newSCTimeout:
t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
}
}
// Test roundrobin on the new picker.
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// After removing a sub-balancer, it will be kept in cache. Make sure that this
// sub-balancer's Close is called when the balancer group is closed.
func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
const balancerName = "stub-TestBalancerGroup_check_close"
closed := make(chan struct{})
stub.Register(balancerName, stub.BalancerFuncs{Close: func(_ *stub.BalancerData) {
close(closed)
}})
builder := balancer.Get(balancerName)
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
gator, bg, _, _ := initBalancerGroupForCachingTest(t)
// Add balancer, and remove
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], builder)
gator.Remove(testBalancerIDs[2])
bg.Remove(testBalancerIDs[2])
// Immediately close balancergroup, before the cache timeout.
bg.Close()
// Make sure the removed child balancer is closed eventually.
select {
case <-closed:
case <-time.After(time.Second * 2):
t.Fatalf("timeout waiting for the child balancer in cache to be closed")
}
}
// TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
// to the balancergroup at creation time is passed to child policies.
func (s) TestBalancerGroupBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
)
// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
if !cmp.Equal(bd.BuildOptions, bOpts) {
return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
}
return nil
},
})
cc := testutils.NewTestClientConn(t)
bg := New(cc, bOpts, nil, nil)
bg.Start()
// Add the stub balancer build above as a child policy.
balancerBuilder := balancer.Get(balancerName)
bg.Add(testBalancerIDs[0], balancerBuilder)
// Send an empty clientConn state change. This should trigger the
// verification of the buildOptions being passed to the child policy.
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
t.Fatal(err)
}
}

View File

@ -16,7 +16,6 @@
*
*/
// Package testutils provides utility types, for use in xds tests.
package testutils
import (
@ -244,18 +243,6 @@ func IsRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error {
return nil
}
// testClosure is a test util for TestIsRoundRobin.
type testClosure struct {
r []balancer.SubConn
i int
}
func (tc *testClosure) next() balancer.SubConn {
ret := tc.r[tc.i]
tc.i = (tc.i + 1) % len(tc.r)
return ret
}
// ErrTestConstPicker is error returned by test const picker.
var ErrTestConstPicker = fmt.Errorf("const picker error")

View File

@ -20,10 +20,10 @@
package balancer
import (
_ "google.golang.org/grpc/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterimpl" // Register the xds_cluster_impl balancer
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
_ "google.golang.org/grpc/xds/internal/balancer/clusterresolver" // Register the xds_cluster_resolver balancer
_ "google.golang.org/grpc/xds/internal/balancer/priority" // Register the priority balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
)

View File

@ -1,925 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// All tests in this file are combination of balancer group and
// weighted_balancerstate_aggregator, aka weighted_target tests. The difference
// is weighted_target tests cannot add sub-balancers to balancer group directly,
// they instead uses balancer config to control sub-balancers. Even though not
// very suited, the tests still cover all the functionality.
//
// TODO: the tests should be moved to weighted_target, and balancer group's
// tests should use a mock balancerstate_aggregator.
package balancergroup
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator"
"google.golang.org/grpc/xds/internal/testutils"
)
var (
rrBuilder = balancer.Get(roundrobin.Name)
pfBuilder = balancer.Get(grpc.PickFirstBalancerName)
testBalancerIDs = []string{"b1", "b2", "b3"}
testBackendAddrs []resolver.Address
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)})
}
// Disable caching for all tests. It will be re-enabled in caching specific
// tests.
DefaultSubBalancerCloseTimeout = time.Millisecond
}
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
func newTestBalancerGroup(t *testing.T) (*testutils.TestClientConn, *weightedaggregator.Aggregator, *BalancerGroup) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
bg.Start()
return cc, gator, bg
}
// 1 balancer, 1 backend -> 2 backends -> 1 backend.
func (s) TestBalancerGroup_OneRR_AddRemoveBackend(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add one balancer to group.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
// Send one resolved address.
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Send two addresses.
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
// Expect one new subconn, send state update.
sc2 := <-cc.NewSubConnCh
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin pick.
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the first address.
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
bg.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Test pick with only the second subconn.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSC, _ := p3.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSC.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2)
}
}
}
// 2 balancers, each with 1 backend.
func (s) TestBalancerGroup_TwoRR_OneBackend(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc2 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// 2 balancers, each with more than 1 backends.
func (s) TestBalancerGroup_TwoRR_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc2's connection down, should be RR between balancers.
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p2 := <-cc.NewPickerCh
// Expect two sc1's in the result, because balancer1 will be picked twice,
// but there's only one sc in it.
want = []balancer.SubConn{sc1, sc1, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove sc3's addresses.
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[3:4]}})
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc3, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scToRemove)
}
bg.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p3 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn sc1's connection down.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p4 := <-cc.NewPickerCh
want = []balancer.SubConn{sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p4)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Turn last connection to connecting.
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
p5 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p5.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
}
}
// Turn all connections down.
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p6 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p6.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
}
}
}
// 2 balancers with different weights.
func (s) TestBalancerGroup_TwoRR_DifferentWeight_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// totally 3 balancers, add/remove balancer.
func (s) TestBalancerGroup_ThreeRR_RemoveBalancer(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add three balancers to group and send one resolved address to both
// balancers.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc2 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:2]}})
sc3 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Remove the second balancer, while the others two are ready.
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// move balancer 3 into transient failure.
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Remove the first balancer, while the third is transient failure.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
gator.BuildAndUpdate()
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrTransientFailure {
t.Fatalf("want pick error %v, got %v", balancer.ErrTransientFailure, err)
}
}
}
// 2 balancers, change balancer weight.
func (s) TestBalancerGroup_TwoRR_ChangeWeight_MoreBackends(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
sc1 := <-cc.NewSubConnCh
sc2 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
sc3 := <-cc.NewSubConnCh
sc4 := <-cc.NewSubConnCh
// Send state changes for both subconns.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
gator.UpdateWeight(testBalancerIDs[0], 3)
gator.BuildAndUpdate()
// Test roundrobin with new weight.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// Create a new balancer group, add balancer and backends, but not start.
// - b1, weight 2, backends [0,1]
// - b2, weight 1, backends [2,3]
// Start the balancer group and check behavior.
//
// Close the balancer group, call add/remove/change weight/change address.
// - b2, weight 3, backends [0,3]
// - b3, weight 1, backends [1,2]
// Start the balancer group again and check for behavior.
func (s) TestBalancerGroup_start_close(t *testing.T) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
gator.Stop()
bg.Close()
for i := 0; i < 4; i++ {
bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
// Remove b1.
gator.Remove(testBalancerIDs[0])
bg.Remove(testBalancerIDs[0])
// Update b2 to weight 3, backends [0,3].
gator.UpdateWeight(testBalancerIDs[1], 3)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
gator.Start()
bg.Start()
m2 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m2[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// Test that balancer group start() doesn't deadlock if the balancer calls back
// into balancer group inline when it gets an update.
//
// The potential deadlock can happen if we
// - hold a lock and send updates to balancer (e.g. update resolved addresses)
// - the balancer calls back (NewSubConn or update picker) in line
// The callback will try to hold hte same lock again, which will cause a
// deadlock.
//
// This test starts the balancer group with a test balancer, will updates picker
// whenever it gets an address update. It's expected that start() doesn't block
// because of deadlock.
func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
stub.Register(balancerName, stub.BalancerFuncs{})
builder := balancer.Get(balancerName)
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], builder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], builder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
}
// Test that at init time, with two sub-balancers, if one sub-balancer reports
// transient_failure, the picks won't fail with transient_failure, and should
// instead wait for the other sub-balancer.
func (s) TestBalancerGroup_InitOneSubBalancerTransientFailure(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
<-cc.NewSubConnCh
// Set one subconn to TransientFailure, this will trigger one sub-balancer
// to report transient failure.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
r, err := p1.Pick(balancer.PickInfo{})
if err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err)
}
}
}
// Test that with two sub-balancers, both in transient_failure, if one turns
// connecting, the overall state stays in transient_failure, and all picks
// return transient failure error.
func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
cc, gator, bg := newTestBalancerGroup(t)
// Add two balancers to group and send one resolved address to both
// balancers.
gator.Add(testBalancerIDs[0], 1)
bg.Add(testBalancerIDs[0], pfBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc1 := <-cc.NewSubConnCh
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], pfBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:1]}})
sc2 := <-cc.NewSubConnCh
// Set both subconn to TransientFailure, this will put both sub-balancers in
// transient failure.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
bg.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
r, err := p1.Pick(balancer.PickInfo{})
if err != balancer.ErrTransientFailure {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
}
}
// Set one subconn to Connecting, it shouldn't change the overall state.
bg.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
r, err := p2.Pick(balancer.PickInfo{})
if err != balancer.ErrTransientFailure {
t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrTransientFailure, r, err)
}
}
}
func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
old := DefaultSubBalancerCloseTimeout
DefaultSubBalancerCloseTimeout = n
return func() { DefaultSubBalancerCloseTimeout = old }
}
// initBalancerGroupForCachingTest creates a balancer group, and initialize it
// to be ready for caching tests.
//
// Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
// is removed later, so the balancer group returned has one sub-balancer in its
// own map, and one sub-balancer in cache.
func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
cc := testutils.NewTestClientConn(t)
gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
gator.Start()
bg := New(cc, balancer.BuildOptions{}, gator, nil)
// Add two balancers to group and send two resolved addresses to both
// balancers.
gator.Add(testBalancerIDs[0], 2)
bg.Add(testBalancerIDs[0], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
bg.Start()
m1 := make(map[resolver.Address]balancer.SubConn)
for i := 0; i < 4; i++ {
addrs := <-cc.NewSubConnAddrsCh
sc := <-cc.NewSubConnCh
m1[addrs[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
// Test roundrobin on the last picker.
p1 := <-cc.NewPickerCh
want := []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p1)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
gator.Remove(testBalancerIDs[1])
bg.Remove(testBalancerIDs[1])
gator.BuildAndUpdate()
// Don't wait for SubConns to be removed after close, because they are only
// removed after close timeout.
for i := 0; i < 10; i++ {
select {
case <-cc.RemoveSubConnCh:
t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond)
}
// Test roundrobin on the with only sub-balancer0.
p2 := <-cc.NewPickerCh
want = []balancer.SubConn{
m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
return gator, bg, cc, m1
}
// Test that if a sub-balancer is removed, and re-added within close timeout,
// the subConns won't be re-created.
func (s) TestBalancerGroup_locality_caching(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Turn down subconn for addr2, shouldn't get picker update because
// sub-balancer1 was removed.
bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
for i := 0; i < 10; i++ {
select {
case <-cc.NewPickerCh:
t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
default:
}
time.Sleep(time.Millisecond)
}
// Sleep, but sleep less then close timeout.
time.Sleep(time.Millisecond * 100)
// Re-add sub-balancer-1, because subconns were in cache, no new subconns
// should be created. But a new picker will still be generated, with subconn
// states update to date.
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], rrBuilder)
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
// addr2 is down, b2 only has addr3 in READY state.
addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
for i := 0; i < 10; i++ {
select {
case <-cc.NewSubConnAddrsCh:
t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond * 10)
}
}
// Sub-balancers are put in cache when they are removed. If balancer group is
// closed within close timeout, all subconns should still be rmeoved
// immediately.
func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
_, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
bg.Close()
// The balancer group is closed. The subconns should be removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[0]]: 1,
addrToSC[testBackendAddrs[1]]: 1,
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
}
// Sub-balancers in cache will be closed if not re-added within timeout, and
// subConns will be removed.
func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
_, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
// The sub-balancer is not re-added within timeout. The subconns should be
// removed.
removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
}
// Wrap the rr builder, so it behaves the same, but has a different pointer.
type noopBalancerBuilderWrapper struct {
balancer.Builder
}
// After removing a sub-balancer, re-add with same ID, but different balancer
// builder. Old subconns should be removed, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Re-add sub-balancer-1, but with a different balancer builder. The
// sub-balancer was still in cache, but cann't be reused. This should cause
// old sub-balancer's subconns to be removed immediately, and new subconns
// to be created.
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
// The cached sub-balancer should be closed, and the subconns should be
// removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
}
}
bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
newSCTimeout := time.After(time.Millisecond * 500)
scToAdd := map[resolver.Address]int{
testBackendAddrs[4]: 1,
testBackendAddrs[5]: 1,
}
for i := 0; i < len(scToAdd); i++ {
select {
case addr := <-cc.NewSubConnAddrsCh:
c := scToAdd[addr[0]]
if c == 0 {
t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
}
scToAdd[addr[0]] = c - 1
sc := <-cc.NewSubConnCh
addrToSC[addr[0]] = sc
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
case <-newSCTimeout:
t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
}
}
// Test roundrobin on the new picker.
p3 := <-cc.NewPickerCh
want := []balancer.SubConn{
addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p3)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
}
// After removing a sub-balancer, it will be kept in cache. Make sure that this
// sub-balancer's Close is called when the balancer group is closed.
func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
const balancerName = "stub-TestBalancerGroup_check_close"
closed := make(chan struct{})
stub.Register(balancerName, stub.BalancerFuncs{Close: func(_ *stub.BalancerData) {
close(closed)
}})
builder := balancer.Get(balancerName)
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
gator, bg, _, _ := initBalancerGroupForCachingTest(t)
// Add balancer, and remove
gator.Add(testBalancerIDs[2], 1)
bg.Add(testBalancerIDs[2], builder)
gator.Remove(testBalancerIDs[2])
bg.Remove(testBalancerIDs[2])
// Immediately close balancergroup, before the cache timeout.
bg.Close()
// Make sure the removed child balancer is closed eventually.
select {
case <-closed:
case <-time.After(time.Second * 2):
t.Fatalf("timeout waiting for the child balancer in cache to be closed")
}
}
// TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
// to the balancergroup at creation time is passed to child policies.
func (s) TestBalancerGroupBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
)
// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
if !cmp.Equal(bd.BuildOptions, bOpts) {
return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
}
return nil
},
})
cc := testutils.NewTestClientConn(t)
bg := New(cc, bOpts, nil, nil)
bg.Start()
// Add the stub balancer build above as a child policy.
balancerBuilder := balancer.Get(balancerName)
bg.Add(testBalancerIDs[0], balancerBuilder)
// Send an empty clientConn state change. This should trigger the
// verification of the buildOptions being passed to the child policy.
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
t.Fatal(err)
}
}

View File

@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/xds/matcher"
"google.golang.org/grpc/resolver"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@ -129,7 +128,7 @@ func (p *fakeProvider) Close() {
// setupWithXDSCreds performs all the setup steps required for tests which use
// xDSCredentials.
func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) {
t.Helper()
xdsC := fakeclient.NewClient()
builder := balancer.Get(cdsName)
@ -145,7 +144,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}
// Create a new CDS balancer and pass it a fake balancer.ClientConn which we
// can use to inspect the different calls made by the balancer.
tcc := xdstestutils.NewTestClientConn(t)
tcc := testutils.NewTestClientConn(t)
cdsB := builder.Build(tcc, balancer.BuildOptions{DialCreds: creds})
// Override the creation of the EDS balancer to return a fake EDS balancer
@ -184,7 +183,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
// passed to the EDS balancer, and verifies that the CDS balancer forwards the
// call appropriately to its parent balancer.ClientConn with or without
// attributes bases on the value of wantFallback.
func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *xdstestutils.TestClientConn, wantFallback bool) (balancer.SubConn, error) {
func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *testutils.TestClientConn, wantFallback bool) (balancer.SubConn, error) {
dummyAddr := "foo-address"
addrs := []resolver.Address{{Addr: dummyAddr}}
sc, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{})

View File

@ -36,7 +36,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@ -225,14 +224,14 @@ func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *inter
// setup creates a cdsBalancer and an edsBalancer (and overrides the
// newChildBalancer function to return it), and also returns a cleanup function.
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) {
t.Helper()
xdsC := fakeclient.NewClient()
builder := balancer.Get(cdsName)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", cdsName)
}
tcc := xdstestutils.NewTestClientConn(t)
tcc := testutils.NewTestClientConn(t)
cdsB := builder.Build(tcc, balancer.BuildOptions{})
edsB := newTestEDSBalancer()
@ -250,7 +249,7 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x
// setupWithWatch does everything that setup does, and also pushes a ClientConn
// update to the cdsBalancer and waits for a CDS watch call to be registered.
func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *testutils.TestClientConn, func()) {
t.Helper()
xdsC, cdsB, edsB, tcc, cancel := setup(t)
@ -692,7 +691,7 @@ func (s) TestClose(t *testing.T) {
// Make sure that the UpdateSubConnState() method on the CDS balancer does
// not forward the update to the EDS balancer.
cdsB.UpdateSubConnState(&xdstestutils.TestSubConn{}, balancer.SubConnState{})
cdsB.UpdateSubConnState(&testutils.TestSubConn{}, balancer.SubConnState{})
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsB.waitForSubConnUpdate(sCtx, subConnWithState{}); err != context.DeadlineExceeded {

View File

@ -36,9 +36,9 @@ import (
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"

View File

@ -24,8 +24,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin"
_ "google.golang.org/grpc/balancer/weightedtarget"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget"
)
const (

View File

@ -25,12 +25,12 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/balancergroup"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
)
const balancerName = "xds_cluster_manager_experimental"

View File

@ -31,13 +31,12 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
itestutils "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type s struct {
@ -524,7 +523,7 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
ccsCh := itestutils.NewChannel()
ccsCh := testutils.NewChannel()
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,

View File

@ -23,9 +23,9 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/weightedtarget"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget"
)
const (

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
@ -32,7 +33,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

View File

@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
@ -37,7 +38,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

View File

@ -26,14 +26,15 @@ import (
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancergroup"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/testutils"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
@ -101,7 +102,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
defer cleanup()
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -115,7 +116,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// The same locality, add one more backend.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
@ -129,7 +130,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// The same locality, delete first backend.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
@ -145,7 +146,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// The same locality, replace backend.
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)
@ -164,7 +165,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// The same locality, different drop rate, dropping 50%.
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)
@ -186,7 +187,7 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// The same locality, remove drops.
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)
@ -207,7 +208,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
defer cleanup()
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
@ -229,7 +230,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
}
// Add another locality, with one backend.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
@ -245,7 +246,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
}
// Remove first locality.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
@ -262,7 +263,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
}
// Add a backend to the last locality.
clab4 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)
@ -281,7 +282,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
}
// Change weight of the locality[1].
clab5 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)
@ -296,7 +297,7 @@ func (s) TestEDS_TwoLocalities(t *testing.T) {
}
// Change weight of the locality[1] to 0, it should never be picked.
clab6 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)
@ -328,8 +329,8 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
defer cleanup()
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &testutils.AddLocalityOptions{
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &xdstestutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
@ -339,7 +340,7 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
corepb.HealthStatus_DEGRADED,
},
})
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &testutils.AddLocalityOptions{
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &xdstestutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
@ -413,7 +414,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -472,7 +473,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh

View File

@ -26,9 +26,10 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/testutils"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
)
// When a high priority is ready, adding/removing lower locality doesn't cause
@ -40,7 +41,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -61,7 +62,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
}
// Add p2, it shouldn't cause any updates.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
@ -78,7 +79,7 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) {
}
// Remove p2, no updates.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
@ -103,7 +104,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -139,7 +140,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Add p2, it shouldn't cause any updates.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
@ -171,7 +172,7 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Remove 2, use 1.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
@ -196,7 +197,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -222,7 +223,7 @@ func (s) TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
}
// Add p2, it should create a new SubConn.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
@ -249,7 +250,7 @@ func (s) TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with priorities [0,1,2], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
@ -340,7 +341,7 @@ func (s) TestEDSPriority_InitTimeout(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -383,7 +384,7 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil)
@ -430,7 +431,7 @@ func (s) TestEDSPriority_MultipleLocalities(t *testing.T) {
}
// Add two localities, with two priorities, with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
@ -480,7 +481,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with different priorities, each with one backend.
clab0 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab0.Build()), nil)
@ -498,7 +499,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Remove all priorities.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
// p0 subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
@ -515,7 +516,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Re-add two localities, with previous priorities, but different backends.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
@ -544,7 +545,7 @@ func (s) TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Remove p1 from EDS, to fallback to p0.
clab3 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
@ -587,7 +588,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -607,7 +608,7 @@ func (s) TestEDSPriority_HighPriorityNoEndpoints(t *testing.T) {
}
// Remove addresses from priority 0, should use p1.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, nil, nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
@ -638,7 +639,7 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, with priorities [0, 1], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
@ -658,8 +659,8 @@ func (s) TestEDSPriority_HighPriorityAllUnhealthy(t *testing.T) {
}
// Set priority 0 endpoints to all unhealthy, should use p1.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], &testutils.AddLocalityOptions{
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], &xdstestutils.AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
})
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
@ -699,11 +700,11 @@ func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) {
_, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// One localities, with priorities [0], each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
// Remove the only localities.
clab2 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
@ -751,7 +752,7 @@ func (s) TestFallbackToDNS(t *testing.T) {
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)

View File

@ -28,8 +28,8 @@ import (
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

View File

@ -30,6 +30,7 @@ import (
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
@ -37,7 +38,6 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
)
// Name is the name of the priority balancer.

View File

@ -29,12 +29,12 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type s struct {

View File

@ -25,14 +25,13 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
grpctestutils "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/testutils"
)
const resolveNowBalancerName = "test-resolve-now-balancer"
var resolveNowBalancerCCCh = grpctestutils.NewChannel()
var resolveNowBalancerCCCh = testutils.NewChannel()
type resolveNowBalancerBuilder struct {
balancer.Builder

View File

@ -26,7 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/internal/testutils"
)
func newTestRing(cStats []connectivity.State) *ring {

View File

@ -30,8 +30,8 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/testutils"
)
var (

View File

@ -1,326 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package weightedtarget
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type testConfigBalancerBuilder struct {
balancer.Builder
}
func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
return &testConfigBalancerBuilder{
Builder: balancer.Get(roundrobin.Name),
}
}
func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
rr := t.Builder.Build(cc, opts)
return &testConfigBalancer{
Balancer: rr,
}
}
const testConfigBalancerName = "test_config_balancer"
func (t *testConfigBalancerBuilder) Name() string {
return testConfigBalancerName
}
type stringBalancerConfig struct {
serviceconfig.LoadBalancingConfig
s string
}
func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
// Return string without quotes.
return stringBalancerConfig{s: string(c[1 : len(c)-1])}, nil
}
// testConfigBalancer is a roundrobin balancer, but it takes the balancer config
// string and append it to the backend addresses.
type testConfigBalancer struct {
balancer.Balancer
}
func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
c, ok := s.BalancerConfig.(stringBalancerConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
}
oneMoreAddr := resolver.Address{Addr: c.s}
s.BalancerConfig = nil
s.ResolverState.Addresses = append(s.ResolverState.Addresses, oneMoreAddr)
return b.Balancer.UpdateClientConnState(s)
}
func (b *testConfigBalancer) Close() {
b.Balancer.Close()
}
var (
wtbBuilder balancer.Builder
wtbParser balancer.ConfigParser
testBackendAddrStrs []string
)
const testBackendAddrsCount = 12
func init() {
balancer.Register(newTestConfigBalancerBuilder())
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
wtbBuilder = balancer.Get(Name)
wtbParser = wtbBuilder.(balancer.ConfigParser)
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer.
//
// This test is intended to test the glue code in weighted_target. Most of the
// functionality tests are covered by the balancer group tests.
func TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr1, []string{"cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
addr1 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr1, []string{}),
}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Send subconn state change.
sc1 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Remove cluster_1, and add "cluster_2: test_config_balancer".
wantAddr3Str := testBackendAddrStrs[2]
config2, err := wtbParser.ParseConfig([]byte(
fmt.Sprintf(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{%q:%q}]}}}`, testConfigBalancerName, wantAddr3Str),
))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and one address with hierarchy path "cluster_2".
wantAddr2 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr2, []string{"cluster_2"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect the address sent in the address list. The hierarchy path should be
// cleared.
addr2 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr2, []string{}),
}; !cmp.Equal(addr2, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr2, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Expect the other address sent as balancer config. This address doesn't
// have hierarchy path.
wantAddr3 := resolver.Address{Addr: wantAddr3Str, Attributes: nil}
addr3 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{wantAddr3}; !cmp.Equal(addr3, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr3, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// The subconn for cluster_1 should be removed.
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
wtb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
sc2 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
sc3 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin pick with backends in cluster_2.
p2 := <-cc.NewPickerCh
want := []balancer.SubConn{sc2, sc3}
if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil {
t.Fatalf("want %v, got %v", want, err)
}
// Replace child policy of "cluster_1" to "round_robin".
config3, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_2":{"weight":1,"childPolicy":[{"round_robin":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddr4 := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddr4, []string{"cluster_2"}),
}},
BalancerConfig: config3,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
addr4 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{
hierarchy.Set(wantAddr4, []string{}),
}; !cmp.Equal(addr4, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr4, want, cmp.AllowUnexported(attributes.Attributes{})))
}
// Send subconn state change.
sc4 := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
wtb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p3.Pick(balancer.PickInfo{})
if !cmp.Equal(gotSCSt.SubConn, sc4, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc4)
}
}
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn
}
}
const initIdleBalancerName = "test-init-Idle-balancer"
var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
func init() {
stub.Register(initIdleBalancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errTestInitIdle
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}
// TestInitialIdle covers the case that if the child reports Idle, the overall
// state will be Idle.
func TestInitialIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
// Start with "cluster_1: round_robin".
config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
sc := <-cc.NewSubConnCh
wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle})
}
if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
t.Fatalf("Received aggregated state: %v, want Idle", state1)
}
}

View File

@ -48,7 +48,6 @@ import (
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
@ -819,7 +818,7 @@ func (s) TestXDSResolverWRR(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = xdstestutils.NewTestWRR
newWRR = testutils.NewTestWRR
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
@ -876,7 +875,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = xdstestutils.NewTestWRR
newWRR = testutils.NewTestWRR
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
@ -1384,7 +1383,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
}
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = xdstestutils.NewTestWRR
newWRR = testutils.NewTestWRR
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.

View File

@ -22,13 +22,14 @@ import (
"testing"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/testutils"
)
func TestIsRoundRobin(t *testing.T) {
var (
sc1 = TestSubConns[0]
sc2 = TestSubConns[1]
sc3 = TestSubConns[2]
sc1 = testutils.TestSubConns[0]
sc2 = testutils.TestSubConns[1]
sc3 = testutils.TestSubConns[2]
)
testCases := []struct {
@ -125,10 +126,22 @@ func TestIsRoundRobin(t *testing.T) {
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
err := IsRoundRobin(tC.want, (&testClosure{r: tC.got}).next)
err := testutils.IsRoundRobin(tC.want, (&testClosure{r: tC.got}).next)
if err == nil != tC.pass {
t.Errorf("want pass %v, want %v, got err %v", tC.pass, tC.want, err)
}
})
}
}
// testClosure is a test util for TestIsRoundRobin.
type testClosure struct {
r []balancer.SubConn
i int
}
func (tc *testClosure) next() balancer.SubConn {
ret := tc.r[tc.i]
tc.i = (tc.i + 1) % len(tc.r)
return ret
}

View File

@ -0,0 +1,19 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package testutils provides utility types, for use in xds tests.
package testutils