clusterresolver: deflake eds_impl tests (#5562)

This commit is contained in:
Easwar Swaminathan 2022-08-29 16:23:12 -07:00 committed by GitHub
parent d5dee5fdbd
commit fe592260bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 911 additions and 664 deletions

View File

@ -0,0 +1,223 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin contains helper functions to check for roundrobin and
// weighted-roundrobin load balancing of RPCs in tests.
package roundrobin
import (
"context"
"fmt"
"math"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)
var logger = grpclog.Component("testutils-roundrobin")
// waitForTrafficToReachBackends repeatedly makes RPCs using the provided
// TestServiceClient until RPCs reach all backends specified in addrs, or the
// context expires, in which case a non-nil error is returned.
func waitForTrafficToReachBackends(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
// Make sure connections to all backends are up. We need to do this two
// times (to be sure that round_robin has kicked in) because the channel
// could have been configured with a different LB policy before the switch
// to round_robin. And the previous LB policy could be sharing backends with
// round_robin, and therefore in the first iteration of this loop, RPCs
// could land on backends owned by the previous LB policy.
for j := 0; j < 2; j++ {
for i := 0; i < len(addrs); i++ {
for {
time.Sleep(time.Millisecond)
if ctx.Err() != nil {
return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr)
}
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
// Some tests remove backends and check if round robin is
// happening across the remaining backends. In such cases,
// RPCs can initially fail on the connection using the
// removed backend. Just keep retrying and eventually the
// connection using the removed backend will shutdown and
// will be removed.
continue
}
if peer.Addr.String() == addrs[i].Addr {
break
}
}
}
}
return nil
}
// CheckRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
// connected to a server exposing the test.grpc_testing.TestService, are
// roundrobined across the given backend addresses.
//
// Returns a non-nil error if context deadline expires before RPCs start to get
// roundrobined across the given backends.
func CheckRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
return err
}
// At this point, RPCs are getting successfully executed at the backends
// that we care about. To support duplicate addresses (in addrs) and
// backends being removed from the list of addresses passed to the
// roundrobin LB, we do the following:
// 1. Determine the count of RPCs that we expect each of our backends to
// receive per iteration.
// 2. Wait until the same pattern repeats a few times, or the context
// deadline expires.
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
// Perform 3 more iterations.
var iterations [][]string
for i := 0; i < 3; i++ {
iteration := make([]string, len(addrs))
for c := 0; c < len(addrs); c++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
iteration[c] = peer.Addr.String()
}
iterations = append(iterations, iteration)
}
// Ensure the the first iteration contains all addresses in addrs.
gotAddrCount := make(map[string]int)
for _, addr := range iterations[0] {
gotAddrCount[addr]++
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
logger.Infof("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff)
continue
}
// Ensure all three iterations contain the same addresses.
if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
logger.Infof("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2])
continue
}
return nil
}
return fmt.Errorf("Timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
}
// CheckWeightedRoundRobinRPCs verifies that EmptyCall RPCs on the given
// ClientConn, connected to a server exposing the test.grpc_testing.TestService,
// are weighted roundrobined (with randomness) across the given backend
// addresses.
//
// Returns a non-nil error if context deadline expires before RPCs start to get
// roundrobined across the given backends.
func CheckWeightedRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
return err
}
// At this point, RPCs are getting successfully executed at the backends
// that we care about. To take the randomness of the WRR into account, we
// look for approximate distribution instead of exact.
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
wantRatio := make(map[string]float64)
for addr, count := range wantAddrCount {
wantRatio[addr] = float64(count) / float64(len(addrs))
}
// There is a small possibility that RPCs are reaching backends that we
// don't expect them to reach here. The can happen because:
// - at time T0, the list of backends [A, B, C, D].
// - at time T1, the test updates the list of backends to [A, B, C], and
// immediately starts attempting to check the distribution of RPCs to the
// new backends.
// - there is no way for the test to wait for a new picker to be pushed on
// to the channel (which contains the updated list of backends) before
// starting to attempt the RPC distribution checks.
// - This is usually a transitory state and will eventually fix itself when
// the new picker is pushed on the channel, and RPCs will start getting
// routed to only backends that we care about.
//
// We work around this situation by using two loops. The inner loop contains
// the meat of the calculations, and includes the logic which factors out
// the randomness in weighted roundrobin. If we ever see an RPCs getting
// routed to a backend that we dont expect it to get routed to, we break
// from the inner loop thereby resetting all state and start afresh.
for {
results := make(map[string]float64)
totalCount := float64(0)
InnerLoop:
for {
if ctx.Err() != nil {
return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
}
for i := 0; i < len(addrs); i++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
if addr := peer.Addr.String(); wantAddrCount[addr] == 0 {
break InnerLoop
}
results[peer.Addr.String()]++
}
totalCount += float64(len(addrs))
gotRatio := make(map[string]float64)
for addr, count := range results {
gotRatio[addr] = count / totalCount
}
if equalApproximate(gotRatio, wantRatio) {
return nil
}
logger.Infof("non-weighted-roundrobin, gotRatio: %v, wantRatio: %v", gotRatio, wantRatio)
}
<-time.After(time.Millisecond)
}
}
func equalApproximate(got, want map[string]float64) bool {
if len(got) != len(want) {
return false
}
opt := cmp.Comparer(func(x, y float64) bool {
delta := math.Abs(x - y)
mean := math.Abs(x+y) / 2.0
return delta/mean < 0.05
})
for addr := range want {
if !cmp.Equal(got[addr], want[addr], opt) {
return false
}
}
return true
}

View File

@ -89,12 +89,15 @@ func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*Manage
server.Stop()
t.Fatalf("Failed to create bootstrap file: %v", err)
}
resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))
resolver, err := resolverBuilder(bootstrapContents)
if err != nil {
server.Stop()
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
var rb resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
server.Stop()
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}
return server, nodeID, bootstrapContents, resolver, func() { server.Stop() }
return server, nodeID, bootstrapContents, rb, func() { server.Stop() }
}

View File

@ -31,8 +31,10 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/fakegrpclb"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
testpb "google.golang.org/grpc/test/grpc_testing"
)
@ -134,7 +136,8 @@ func (s) TestBalancerSwitch_Basic(t *testing.T) {
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
@ -169,7 +172,8 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs[0:1]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[0:1]); err != nil {
t.Fatal(err)
}
@ -177,7 +181,7 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
// This should not lead to a balancer switch.
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
@ -220,14 +224,15 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, which returns a single backend address.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
@ -272,7 +277,8 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
@ -283,13 +289,13 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}},
ServiceConfig: scpr,
})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch back to "round_robin".
r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -337,7 +343,8 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -375,7 +382,8 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
r.UpdateState(resolver.State{
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
@ -388,13 +396,13 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
ServiceConfig: scpr,
})
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
// Switch to "round_robin" by removing the address of type "grpclb".
r.UpdateState(resolver.State{Addresses: addrs[1:]})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -422,7 +430,8 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}
@ -432,7 +441,7 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
@ -444,7 +453,7 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
// else, the address of type "grpclb" should be ignored.
grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}
r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
@ -545,7 +554,8 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) {
Addresses: addrs[1:],
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
@ -591,7 +601,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) {
t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
case <-ccUpdateCh:
}
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}

View File

@ -20,12 +20,10 @@ package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
@ -33,82 +31,17 @@ import (
"google.golang.org/grpc/internal/channelz"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/stubserver"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
)
const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error {
client := testgrpc.NewTestServiceClient(cc)
// Make sure connections to all backends are up. We need to do this two
// times (to be sure that round_robin has kicked in) because the channel
// could have been configured with a different LB policy before the switch
// to round_robin. And the previous LB policy could be sharing backends with
// round_robin, and therefore in the first iteration of this loop, RPCs
// could land on backends owned by the previous LB policy.
backendCount := len(addrs)
for j := 0; j < 2; j++ {
for i := 0; i < backendCount; i++ {
for {
time.Sleep(time.Millisecond)
if ctx.Err() != nil {
return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr)
}
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
// Some tests remove backends and check if round robin is
// happening across the remaining backends. In such cases,
// RPCs can initially fail on the connection using the
// removed backend. Just keep retrying and eventually the
// connection using the removed backend will shutdown and
// will be removed.
continue
}
if peer.Addr.String() == addrs[i].Addr {
break
}
}
}
}
// Perform 3 iterations.
var iterations [][]string
for i := 0; i < 3; i++ {
iteration := make([]string, backendCount)
for c := 0; c < backendCount; c++ {
var peer peer.Peer
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
}
iteration[c] = peer.Addr.String()
}
iterations = append(iterations, iteration)
}
// Ensure the the first iteration contains all addresses in addrs. To
// support duplicate addresses, we determine the count of each address.
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
gotAddrCount := make(map[string]int)
for _, addr := range iterations[0] {
gotAddrCount[addr]++
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
return fmt.Errorf("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff)
}
// Ensure all three iterations contain the same addresses.
if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2])
}
return nil
}
func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
t.Helper()
@ -157,7 +90,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt
}
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}
return cc, r, backends
@ -275,7 +208,8 @@ func (s) TestRoundRobin_OneServerDown(t *testing.T) {
for i := 0; i < len(backends)-1; i++ {
addrs[i] = resolver.Address{Addr: backends[i].Address}
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
client := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatalf("RPCs are not being round robined across remaining servers: %v", err)
}
}

View File

@ -48,7 +48,7 @@ import (
)
const (
defaultTestTimeout = 1 * time.Second
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
testEDSServcie = "test-eds-service-name"
testClusterName = "test-cluster-name"

View File

@ -0,0 +1,587 @@
/*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package e2e_test
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
_ "google.golang.org/grpc/xds/internal/balancer/clusterresolver" // Register the "cluster_resolver_experimental" LB policy.
_ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client.
)
const (
clusterName = "cluster-my-service-client-side-xds"
edsServiceName = "endpoints-my-service-client-side-xds"
localityName1 = "my-locality-1"
localityName2 = "my-locality-2"
localityName3 = "my-locality-3"
defaultTestTimeout = 5 * time.Second
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// backendAddressesAndPorts extracts the address and port of each of the
// StubServers passed in and returns them. Fails the test if any of the
// StubServers passed have an invalid address.
func backendAddressesAndPorts(t *testing.T, servers []*stubserver.StubServer) ([]resolver.Address, []uint32) {
addrs := make([]resolver.Address, len(servers))
ports := make([]uint32, len(servers))
for i := 0; i < len(servers); i++ {
addrs[i] = resolver.Address{Addr: servers[i].Address}
ports[i] = extractPortFromAddress(t, servers[i].Address)
}
return addrs, ports
}
func extractPortFromAddress(t *testing.T, address string) uint32 {
_, p, err := net.SplitHostPort(address)
if err != nil {
t.Fatalf("invalid server address %q: %v", address, err)
}
port, err := strconv.ParseUint(p, 10, 32)
if err != nil {
t.Fatalf("invalid server address %q: %v", address, err)
}
return uint32(port)
}
func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.StubServer, func()) {
servers := make([]*stubserver.StubServer, numBackends)
for i := 0; i < numBackends; i++ {
servers[i] = &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
servers[i].StartServer()
}
return servers, func() {
for _, server := range servers {
server.Stop()
}
}
}
// endpointResource returns an EDS resource for the given cluster name and
// localities. Backends within a locality are all assumed to be on the same
// machine (localhost).
func endpointResource(clusterName string, localities []localityInfo) *v3endpointpb.ClusterLoadAssignment {
var localityEndpoints []*v3endpointpb.LocalityLbEndpoints
for _, locality := range localities {
var endpoints []*v3endpointpb.LbEndpoint
for i, port := range locality.ports {
endpoint := &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{
Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: "localhost",
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
},
},
},
},
}
if i < len(locality.healthStatus) {
endpoint.HealthStatus = locality.healthStatus[i]
}
endpoints = append(endpoints, endpoint)
}
localityEndpoints = append(localityEndpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{SubZone: locality.name},
LbEndpoints: endpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.weight},
})
}
return &v3endpointpb.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: localityEndpoints,
}
}
type localityInfo struct {
name string
weight uint32
ports []uint32
healthStatus []v3corepb.HealthStatus
}
// clientEndpointsResource returns an EDS resource for the specified nodeID,
// service name and localities.
func clientEndpointsResource(nodeID, edsServiceName string, localities []localityInfo) e2e.UpdateOptions {
return e2e.UpdateOptions{
NodeID: nodeID,
Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpointResource(edsServiceName, localities)},
SkipValidation: true,
}
}
// TestEDS_OneLocality tests the cluster_resolver LB policy using an EDS
// resource with one locality. The following scenarios are tested:
// 1. Single backend. Test verifies that RPCs reach this backend.
// 2. Add a backend. Test verifies that RPCs are roundrobined across the two
// backends.
// 3. Remove one backend. Test verifies that all RPCs reach the other backend.
// 4. Replace the backend. Test verifies that all RPCs reach the new backend.
func (s) TestEDS_OneLocality(t *testing.T) {
// Spin up a management server to receive xDS resources from.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()
// Start backend servers which provide an implementation of the TestService.
servers, cleanup2 := startTestServiceBackends(t, 3)
defer cleanup2()
addrs, ports := backendAddressesAndPorts(t, servers)
// Create xDS resources for consumption by the test. We start off with a
// single backend in a single EDS locality.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Create a manual resolver and push a service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s"
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
// Ensure RPCs are being roundrobined across the single backend.
testClient := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
t.Fatal(err)
}
// Add a backend to the same locality, and ensure RPCs are sent in a
// roundrobin fashion across the two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:2]}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:2]); err != nil {
t.Fatal(err)
}
// Remove the first backend, and ensure all RPCs are sent to the second
// backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[1:2]}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[1:2]); err != nil {
t.Fatal(err)
}
// Replace the backend, and ensure all RPCs are sent to the new backend.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[2:3]}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[2:3]); err != nil {
t.Fatal(err)
}
}
// TestEDS_MultipleLocalities tests the cluster_resolver LB policy using an EDS
// resource with multiple localities. The following scenarios are tested:
// 1. Two localities, each with a single backend. Test verifies that RPCs are
// weighted roundrobined across these two backends.
// 2. Add another locality, with a single backend. Test verifies that RPCs are
// weighted roundrobined across all the backends.
// 3. Remove one locality. Test verifies that RPCs are weighted roundrobined
// across backends from the remaining localities.
// 4. Add a backend to one locality. Test verifies that RPCs are weighted
// roundrobined across localities.
// 5. Change the weight of one of the localities. Test verifies that RPCs are
// weighted roundrobined across the localities.
//
// In our LB policy tree, one of the descendents of the "cluster_resolver" LB
// policy is the "weighted_target" LB policy which performs weighted roundrobin
// across localities (and this has a randomness component associated with it).
// Therefore, the moment we have backends from more than one locality, RPCs are
// weighted roundrobined across them.
func (s) TestEDS_MultipleLocalities(t *testing.T) {
// Spin up a management server to receive xDS resources from.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()
// Start backend servers which provide an implementation of the TestService.
servers, cleanup2 := startTestServiceBackends(t, 4)
defer cleanup2()
addrs, ports := backendAddressesAndPorts(t, servers)
// Create xDS resources for consumption by the test. We start off with two
// localities, and single backend in each of them.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s"
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
// Ensure RPCs are being weighted roundrobined across the two backends.
testClient := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:2]); err != nil {
t.Fatal(err)
}
// Add another locality with a single backend, and ensure RPCs are being
// weighted roundrobined across the three backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:1]},
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:3]); err != nil {
t.Fatal(err)
}
// Remove the first locality, and ensure RPCs are being weighted
// roundrobined across the remaining two backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:3]},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[1:3]); err != nil {
t.Fatal(err)
}
// Add a backend to one locality, and ensure weighted roundrobin. Since RPCs
// are roundrobined across localities, locality2's backend will receive
// twice the traffic.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 1, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:4]},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
wantAddrs := []resolver.Address{addrs[1], addrs[1], addrs[2], addrs[3]}
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
t.Fatal(err)
}
// Change the weight of locality2 and ensure weighted roundrobin. Since
// locality2 has twice the weight of locality3, it will be picked twice as
// frequently as locality3 for RPCs. And since locality2 has a single
// backend and locality3 has two backends, the backend in locality2 will
// receive four times the traffic of each of locality3's backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 2, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:4]},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
wantAddrs = []resolver.Address{addrs[1], addrs[1], addrs[1], addrs[1], addrs[2], addrs[3]}
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
t.Fatal(err)
}
}
// TestEDS_EndpointsHealth tests the cluster_resolver LB policy using an EDS
// resource which specifies endpoint health information and verifies that
// traffic is routed only to backends deemed capable of receiving traffic.
func (s) TestEDS_EndpointsHealth(t *testing.T) {
// Spin up a management server to receive xDS resources from.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()
// Start backend servers which provide an implementation of the TestService.
servers, cleanup2 := startTestServiceBackends(t, 12)
defer cleanup2()
addrs, ports := backendAddressesAndPorts(t, servers)
// Create xDS resources for consumption by the test. Two localities with
// six backends each, with two of the six backends being healthy. Both
// UNKNOWN and HEALTHY are considered by gRPC for load balancing.
resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName1, weight: 1, ports: ports[:6], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
{name: localityName2, weight: 1, ports: ports[6:12], healthStatus: []v3corepb.HealthStatus{
v3corepb.HealthStatus_UNKNOWN,
v3corepb.HealthStatus_HEALTHY,
v3corepb.HealthStatus_UNHEALTHY,
v3corepb.HealthStatus_DRAINING,
v3corepb.HealthStatus_TIMEOUT,
v3corepb.HealthStatus_DEGRADED,
}},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s"
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
// Ensure RPCs are being weighted roundrobined across healthy backends from
// both localities.
testClient := testpb.NewTestServiceClient(cc)
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, append(addrs[0:2], addrs[6:8]...)); err != nil {
t.Fatal(err)
}
}
// TestEDS_EmptyUpdate tests the cluster_resolver LB policy using an EDS
// resource with no localities and verifies that RPCs fail with "all priorities
// removed" error.
func (s) TestEDS_EmptyUpdate(t *testing.T) {
// Spin up a management server to receive xDS resources from.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil)
defer cleanup1()
// Start backend servers which provide an implementation of the TestService.
servers, cleanup2 := startTestServiceBackends(t, 4)
defer cleanup2()
addrs, ports := backendAddressesAndPorts(t, servers)
oldCacheTimeout := balancergroup.DefaultSubBalancerCloseTimeout
balancergroup.DefaultSubBalancerCloseTimeout = 100 * time.Microsecond
defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }()
// Create xDS resources for consumption by the test. The first update is an
// empty update. This should put the channel in TRANSIENT_FAILURE.
resources := clientEndpointsResource(nodeID, edsServiceName, nil)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
r := manual.NewBuilderWithScheme("whatever")
jsonSC := fmt.Sprintf(`{
"loadBalancingConfig":[{
"cluster_resolver_experimental":{
"discoveryMechanisms": [{
"cluster": "%s",
"type": "EDS",
"edsServiceName": "%s"
}],
"xdsLbPolicy":[{"round_robin":{}}]
}
}]
}`, clusterName, edsServiceName)
scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
// Create a ClientConn and ensure that RPCs fail with "all priorities
// removed" error. This is the expected error when the cluster_resolver LB
// policy receives an EDS update with no localities.
cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
testClient := testpb.NewTestServiceClient(cc)
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
t.Fatal(err)
}
// Add a locality with one backend and ensure RPCs are successful.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
t.Fatal(err)
}
// Push another empty update and ensure that RPCs fail with "all priorities
// removed" error again.
resources = clientEndpointsResource(nodeID, edsServiceName, nil)
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
t.Fatal(err)
}
}
// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
// TestServiceClient until they fail with an error which indicates that all
// priorities have been removed. A non-nil error is returned if the context
// expires before RPCs fail with the expected error.
func waitForAllPrioritiesRemovedError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
t.Log("EmptyCall() succeeded after EDS update with no localities")
continue
}
if code := status.Code(err); code != codes.Unavailable {
t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
continue
}
if !strings.Contains(err.Error(), priority.ErrAllPrioritiesRemoved.Error()) {
t.Logf("EmptyCall() = %v, want %v", err, priority.ErrAllPrioritiesRemoved)
continue
}
return nil
}
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and priority.ErrAllPrioritiesRemoved error")
}

View File

@ -1,569 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package clusterresolver
import (
"context"
"fmt"
"sort"
"testing"
"time"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancergroup"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs []string
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
clusterimpl.NewRandomWRR = testutils.NewTestWRR
weightedtarget.NewRandomWRR = testutils.NewTestWRR
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100
}
func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
cc := testutils.NewTestClientConn(t)
builder := balancer.Get(Name)
edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsb == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []DiscoveryMechanism{{
Cluster: testClusterName,
Type: DiscoveryMechanismTypeEDS,
}},
},
}); err != nil {
edsb.Close()
xdsC.Close()
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
edsb.Close()
xdsC.Close()
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
return edsb, cc, xdsC, func() {
edsb.Close()
xdsC.Close()
}
}
// One locality
// - add backend
// - remove backend
// - replace backend
// - change drop rate
func (s) TestEDS_OneLocality(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// One locality with one backend.
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Pick with only the first backend.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// The same locality, add one more backend.
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with two subconns.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2}); err != nil {
t.Fatal(err)
}
// The same locality, delete first backend.
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Test pick with only the second subconn.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
// The same locality, replace backend.
clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)
sc3 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
scToRemove = <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
}
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Test pick with only the third subconn.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil {
t.Fatal(err)
}
// The same locality, different drop rate, dropping 50%.
clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)
// Picks with drops.
if err := testPickerFromCh(cc.NewPickerCh, func(p balancer.Picker) error {
for i := 0; i < 100; i++ {
_, err := p.Pick(balancer.PickInfo{})
// TODO: the dropping algorithm needs a design. When the dropping algorithm
// is fixed, this test also needs fix.
if i%2 == 0 && err == nil {
return fmt.Errorf("%d - the even number picks should be drops, got error <nil>", i)
} else if i%2 != 0 && err != nil {
return fmt.Errorf("%d - the odd number picks should be non-drops, got error %v", i, err)
}
}
return nil
}); err != nil {
t.Fatal(err)
}
// The same locality, remove drops.
clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil)
// Pick without drops.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil {
t.Fatal(err)
}
}
// 2 locality
// - start with 2 locality
// - add locality
// - remove locality
// - address change for the <not-the-first> locality
// - update locality weight
func (s) TestEDS_TwoLocalities(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, each with one backend.
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Add the second locality later to make sure sc2 belongs to the second
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with two subconns.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2}); err != nil {
t.Fatal(err)
}
// Add another locality, with one backend.
clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil)
sc3 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with three subconns.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2, sc3}); err != nil {
t.Fatal(err)
}
// Remove first locality.
clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil)
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Test pick with two subconns (without the first one).
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc3}); err != nil {
t.Fatal(err)
}
// Add a backend to the last locality.
clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil)
sc4 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with two subconns (without the first one).
//
// Locality-1 will be picked twice, and locality-2 will be picked twice.
// Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect
// two sc2's and sc3, sc4.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc3, sc4}); err != nil {
t.Fatal(err)
}
// Change weight of the locality[1].
clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil)
// Test pick with two subconns different locality weight.
//
// Locality-1 will be picked four times, and locality-2 will be picked twice
// (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and
// sc4. So expect four sc2's and sc3, sc4.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}); err != nil {
t.Fatal(err)
}
}
// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only
// healthy ones are used.
func (s) TestEDS_EndpointsHealth(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &xdstestutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &xdstestutils.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
corepb.HealthStatus_DRAINING,
corepb.HealthStatus_TIMEOUT,
corepb.HealthStatus_DEGRADED,
},
})
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
var (
readySCs []balancer.SubConn
newSubConnAddrStrs []string
)
for i := 0; i < 4; i++ {
addr := <-cc.NewSubConnAddrsCh
newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr)
sc := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
readySCs = append(readySCs, sc)
}
wantNewSubConnAddrStrs := []string{
testEndpointAddrs[0],
testEndpointAddrs[2],
testEndpointAddrs[6],
testEndpointAddrs[8],
}
sortStrTrans := cmp.Transformer("Sort", func(in []string) []string {
out := append([]string(nil), in...) // Copy input to avoid mutating it.
sort.Strings(out)
return out
})
if !cmp.Equal(newSubConnAddrStrs, wantNewSubConnAddrStrs, sortStrTrans) {
t.Fatalf("want newSubConn with address %v, got %v", wantNewSubConnAddrStrs, newSubConnAddrStrs)
}
// There should be exactly 4 new SubConns. Check to make sure there's no
// more subconns being created.
select {
case <-cc.NewSubConnCh:
t.Fatalf("Got unexpected new subconn")
case <-time.After(time.Microsecond * 100):
}
// Test roundrobin with the subconns.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, readySCs); err != nil {
t.Fatal(err)
}
}
// TestEDS_EmptyUpdate covers the cases when eds impl receives an empty update.
//
// It should send an error picker with transient failure to the parent.
func (s) TestEDS_EmptyUpdate(t *testing.T) {
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
const cacheTimeout = 100 * time.Microsecond
oldCacheTimeout := balancergroup.DefaultSubBalancerCloseTimeout
balancergroup.DefaultSubBalancerCloseTimeout = cacheTimeout
defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }()
// The first update is an empty update.
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil)
// Pick should fail with transient failure, and all priority removed error.
if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil {
t.Fatal(err)
}
// One locality with one backend.
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Pick with only the first backend.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil)
// Pick should fail with transient failure, and all priority removed error.
if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil {
t.Fatal(err)
}
// Wait for the old SubConn to be removed (which happens when the child
// policy is closed), so a new update would trigger a new SubConn (we need
// this new SubConn to tell if the next picker is newly created).
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
}
edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Handle another update with priorities and localities.
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc2 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Pick with only the first backend.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil {
t.Fatal(err)
}
}
func (s) TestEDS_CircuitBreaking(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
var maxRequests uint32 = 50
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []DiscoveryMechanism{{
Cluster: testClusterName,
MaxConcurrentRequests: &maxRequests,
Type: DiscoveryMechanismTypeEDS,
}},
},
}); err != nil {
t.Fatal(err)
}
// One locality with one backend.
clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Picks with drops.
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
dones := []func(){}
defer func() {
for _, f := range dones {
f()
}
}()
for i := 0; i < 100; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if i < 50 && err != nil {
return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
} else if i > 50 && err == nil {
return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
}
}
for _, done := range dones {
done()
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 50; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if err != nil {
return fmt.Errorf("The third 50%% picks should be non-drops, got error %v", err)
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
// Send another update, with only circuit breaking update (and no picker
// update afterwards). Make sure the new picker uses the new configs.
var maxRequests2 uint32 = 10
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []DiscoveryMechanism{{
Cluster: testClusterName,
MaxConcurrentRequests: &maxRequests2,
Type: DiscoveryMechanismTypeEDS,
}},
},
}); err != nil {
t.Fatal(err)
}
// Picks with drops.
if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
dones := []func(){}
defer func() {
for _, f := range dones {
f()
}
}()
for i := 0; i < 100; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if i < 10 && err != nil {
return fmt.Errorf("The first 10%% picks should be non-drops, got error %v", err)
} else if i > 10 && err == nil {
return fmt.Errorf("The next 90%% picks should be drops, got error <nil>")
}
}
for _, done := range dones {
done()
}
dones = []func(){}
// Pick without drops.
for i := 0; i < 10; i++ {
pr, err := p.Pick(balancer.PickInfo{})
if pr.Done != nil {
dones = append(dones, func() {
pr.Done(balancer.DoneInfo{})
})
}
if err != nil {
return fmt.Errorf("The next 10%% picks should be non-drops, got error %v", err)
}
}
return nil
}); err != nil {
t.Fatal(err.Error())
}
}

View File

@ -19,19 +19,78 @@ package clusterresolver
import (
"context"
"fmt"
"testing"
"time"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancergroup"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs []string
)
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
clusterimpl.NewRandomWRR = testutils.NewTestWRR
weightedtarget.NewRandomWRR = testutils.NewTestWRR
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100
}
func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
cc := testutils.NewTestClientConn(t)
builder := balancer.Get(Name)
edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsb == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []DiscoveryMechanism{{
Cluster: testClusterName,
Type: DiscoveryMechanismTypeEDS,
}},
},
}); err != nil {
edsb.Close()
xdsC.Close()
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
edsb.Close()
xdsC.Close()
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
return edsb, cc, xdsC, func() {
edsb.Close()
xdsC.Close()
}
}
// When a high priority is ready, adding/removing lower locality doesn't cause
// changes.
//