From fe592260bf659cfbf539642ba3145dc998417a00 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 29 Aug 2022 16:23:12 -0700 Subject: [PATCH] clusterresolver: deflake eds_impl tests (#5562) --- internal/testutils/roundrobin/roundrobin.go | 223 +++++++ .../xds/e2e/setup_management_server.go | 15 +- test/balancer_switching_test.go | 44 +- test/roundrobin_test.go | 76 +-- .../clusterresolver/clusterresolver_test.go | 2 +- .../clusterresolver/e2e_test/eds_impl_test.go | 587 ++++++++++++++++++ .../balancer/clusterresolver/eds_impl_test.go | 569 ----------------- .../balancer/clusterresolver/priority_test.go | 59 ++ 8 files changed, 911 insertions(+), 664 deletions(-) create mode 100644 internal/testutils/roundrobin/roundrobin.go create mode 100644 xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go delete mode 100644 xds/internal/balancer/clusterresolver/eds_impl_test.go diff --git a/internal/testutils/roundrobin/roundrobin.go b/internal/testutils/roundrobin/roundrobin.go new file mode 100644 index 000000000..034c8c6f9 --- /dev/null +++ b/internal/testutils/roundrobin/roundrobin.go @@ -0,0 +1,223 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package roundrobin contains helper functions to check for roundrobin and +// weighted-roundrobin load balancing of RPCs in tests. +package roundrobin + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +var logger = grpclog.Component("testutils-roundrobin") + +// waitForTrafficToReachBackends repeatedly makes RPCs using the provided +// TestServiceClient until RPCs reach all backends specified in addrs, or the +// context expires, in which case a non-nil error is returned. +func waitForTrafficToReachBackends(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { + // Make sure connections to all backends are up. We need to do this two + // times (to be sure that round_robin has kicked in) because the channel + // could have been configured with a different LB policy before the switch + // to round_robin. And the previous LB policy could be sharing backends with + // round_robin, and therefore in the first iteration of this loop, RPCs + // could land on backends owned by the previous LB policy. + for j := 0; j < 2; j++ { + for i := 0; i < len(addrs); i++ { + for { + time.Sleep(time.Millisecond) + if ctx.Err() != nil { + return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr) + } + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + // Some tests remove backends and check if round robin is + // happening across the remaining backends. In such cases, + // RPCs can initially fail on the connection using the + // removed backend. Just keep retrying and eventually the + // connection using the removed backend will shutdown and + // will be removed. + continue + } + if peer.Addr.String() == addrs[i].Addr { + break + } + } + } + } + return nil +} + +// CheckRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn, +// connected to a server exposing the test.grpc_testing.TestService, are +// roundrobined across the given backend addresses. +// +// Returns a non-nil error if context deadline expires before RPCs start to get +// roundrobined across the given backends. +func CheckRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { + if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil { + return err + } + + // At this point, RPCs are getting successfully executed at the backends + // that we care about. To support duplicate addresses (in addrs) and + // backends being removed from the list of addresses passed to the + // roundrobin LB, we do the following: + // 1. Determine the count of RPCs that we expect each of our backends to + // receive per iteration. + // 2. Wait until the same pattern repeats a few times, or the context + // deadline expires. + wantAddrCount := make(map[string]int) + for _, addr := range addrs { + wantAddrCount[addr.Addr]++ + } + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + // Perform 3 more iterations. + var iterations [][]string + for i := 0; i < 3; i++ { + iteration := make([]string, len(addrs)) + for c := 0; c < len(addrs); c++ { + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + return fmt.Errorf("EmptyCall() = %v, want ", err) + } + iteration[c] = peer.Addr.String() + } + iterations = append(iterations, iteration) + } + // Ensure the the first iteration contains all addresses in addrs. + gotAddrCount := make(map[string]int) + for _, addr := range iterations[0] { + gotAddrCount[addr]++ + } + if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { + logger.Infof("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff) + continue + } + // Ensure all three iterations contain the same addresses. + if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) { + logger.Infof("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2]) + continue + } + return nil + } + return fmt.Errorf("Timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs) +} + +// CheckWeightedRoundRobinRPCs verifies that EmptyCall RPCs on the given +// ClientConn, connected to a server exposing the test.grpc_testing.TestService, +// are weighted roundrobined (with randomness) across the given backend +// addresses. +// +// Returns a non-nil error if context deadline expires before RPCs start to get +// roundrobined across the given backends. +func CheckWeightedRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { + if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil { + return err + } + + // At this point, RPCs are getting successfully executed at the backends + // that we care about. To take the randomness of the WRR into account, we + // look for approximate distribution instead of exact. + wantAddrCount := make(map[string]int) + for _, addr := range addrs { + wantAddrCount[addr.Addr]++ + } + wantRatio := make(map[string]float64) + for addr, count := range wantAddrCount { + wantRatio[addr] = float64(count) / float64(len(addrs)) + } + + // There is a small possibility that RPCs are reaching backends that we + // don't expect them to reach here. The can happen because: + // - at time T0, the list of backends [A, B, C, D]. + // - at time T1, the test updates the list of backends to [A, B, C], and + // immediately starts attempting to check the distribution of RPCs to the + // new backends. + // - there is no way for the test to wait for a new picker to be pushed on + // to the channel (which contains the updated list of backends) before + // starting to attempt the RPC distribution checks. + // - This is usually a transitory state and will eventually fix itself when + // the new picker is pushed on the channel, and RPCs will start getting + // routed to only backends that we care about. + // + // We work around this situation by using two loops. The inner loop contains + // the meat of the calculations, and includes the logic which factors out + // the randomness in weighted roundrobin. If we ever see an RPCs getting + // routed to a backend that we dont expect it to get routed to, we break + // from the inner loop thereby resetting all state and start afresh. + for { + results := make(map[string]float64) + totalCount := float64(0) + InnerLoop: + for { + if ctx.Err() != nil { + return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs) + } + for i := 0; i < len(addrs); i++ { + var peer peer.Peer + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + return fmt.Errorf("EmptyCall() = %v, want ", err) + } + if addr := peer.Addr.String(); wantAddrCount[addr] == 0 { + break InnerLoop + } + results[peer.Addr.String()]++ + } + totalCount += float64(len(addrs)) + + gotRatio := make(map[string]float64) + for addr, count := range results { + gotRatio[addr] = count / totalCount + } + if equalApproximate(gotRatio, wantRatio) { + return nil + } + logger.Infof("non-weighted-roundrobin, gotRatio: %v, wantRatio: %v", gotRatio, wantRatio) + } + <-time.After(time.Millisecond) + } +} + +func equalApproximate(got, want map[string]float64) bool { + if len(got) != len(want) { + return false + } + opt := cmp.Comparer(func(x, y float64) bool { + delta := math.Abs(x - y) + mean := math.Abs(x+y) / 2.0 + return delta/mean < 0.05 + }) + for addr := range want { + if !cmp.Equal(got[addr], want[addr], opt) { + return false + } + } + return true +} diff --git a/internal/testutils/xds/e2e/setup_management_server.go b/internal/testutils/xds/e2e/setup_management_server.go index c61f0620c..b5efa2bd1 100644 --- a/internal/testutils/xds/e2e/setup_management_server.go +++ b/internal/testutils/xds/e2e/setup_management_server.go @@ -89,12 +89,15 @@ func SetupManagementServer(t *testing.T, opts *ManagementServerOptions) (*Manage server.Stop() t.Fatalf("Failed to create bootstrap file: %v", err) } - resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error)) - resolver, err := resolverBuilder(bootstrapContents) - if err != nil { - server.Stop() - t.Fatalf("Failed to create xDS resolver for testing: %v", err) + + var rb resolver.Builder + if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { + rb, err = newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents) + if err != nil { + server.Stop() + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } } - return server, nodeID, bootstrapContents, resolver, func() { server.Stop() } + return server, nodeID, bootstrapContents, rb, func() { server.Stop() } } diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index ede88fda5..94ac79655 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -31,8 +31,10 @@ import ( "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/fakegrpclb" + rrutil "google.golang.org/grpc/internal/testutils/roundrobin" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -134,7 +136,8 @@ func (s) TestBalancerSwitch_Basic(t *testing.T) { Addresses: addrs, ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil { t.Fatal(err) } @@ -169,7 +172,8 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) { r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkRoundRobin(ctx, cc, addrs[0:1]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[0:1]); err != nil { t.Fatal(err) } @@ -177,7 +181,7 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) { // This should not lead to a balancer switch. const nonExistentServer = "non-existent-grpclb-server-address" r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}}) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } @@ -220,14 +224,15 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) { // to the grpclb server we created above. This will cause the channel to // switch to the "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } // Push a resolver update containing a non-existent grpclb server address. // This should not lead to a balancer switch. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}}) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } @@ -272,7 +277,8 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) { r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } @@ -283,13 +289,13 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) { Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}, ServiceConfig: scpr, }) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } // Switch back to "round_robin". r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr}) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } } @@ -337,7 +343,8 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) { Addresses: addrs, ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } } @@ -375,7 +382,8 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing r.UpdateState(resolver.State{ Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), }) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } @@ -388,13 +396,13 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), ServiceConfig: scpr, }) - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } // Switch to "round_robin" by removing the address of type "grpclb". r.UpdateState(resolver.State{Addresses: addrs[1:]}) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } } @@ -422,7 +430,8 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil { t.Fatal(err) } @@ -432,7 +441,7 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { Addresses: addrs[1:], ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } @@ -444,7 +453,7 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { // else, the address of type "grpclb" should be ignored. grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB} r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)}) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } } @@ -545,7 +554,8 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) { Addresses: addrs[1:], ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } @@ -591,7 +601,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) { t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer") case <-ccUpdateCh: } - if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil { t.Fatal(err) } diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index 0a1300479..80f04dd25 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -20,12 +20,10 @@ package test import ( "context" - "fmt" "strings" "testing" "time" - "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" @@ -33,82 +31,17 @@ import ( "google.golang.org/grpc/internal/channelz" imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/internal/stubserver" + rrutil "google.golang.org/grpc/internal/testutils/roundrobin" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" - testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" ) const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` -func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error { - client := testgrpc.NewTestServiceClient(cc) - // Make sure connections to all backends are up. We need to do this two - // times (to be sure that round_robin has kicked in) because the channel - // could have been configured with a different LB policy before the switch - // to round_robin. And the previous LB policy could be sharing backends with - // round_robin, and therefore in the first iteration of this loop, RPCs - // could land on backends owned by the previous LB policy. - backendCount := len(addrs) - for j := 0; j < 2; j++ { - for i := 0; i < backendCount; i++ { - for { - time.Sleep(time.Millisecond) - if ctx.Err() != nil { - return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr) - } - var peer peer.Peer - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { - // Some tests remove backends and check if round robin is - // happening across the remaining backends. In such cases, - // RPCs can initially fail on the connection using the - // removed backend. Just keep retrying and eventually the - // connection using the removed backend will shutdown and - // will be removed. - continue - } - if peer.Addr.String() == addrs[i].Addr { - break - } - } - } - } - // Perform 3 iterations. - var iterations [][]string - for i := 0; i < 3; i++ { - iteration := make([]string, backendCount) - for c := 0; c < backendCount; c++ { - var peer peer.Peer - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { - return fmt.Errorf("EmptyCall() = %v, want ", err) - } - iteration[c] = peer.Addr.String() - } - iterations = append(iterations, iteration) - } - // Ensure the the first iteration contains all addresses in addrs. To - // support duplicate addresses, we determine the count of each address. - wantAddrCount := make(map[string]int) - for _, addr := range addrs { - wantAddrCount[addr.Addr]++ - } - gotAddrCount := make(map[string]int) - for _, addr := range iterations[0] { - gotAddrCount[addr]++ - } - if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { - return fmt.Errorf("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff) - } - // Ensure all three iterations contain the same addresses. - if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) { - return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2]) - } - return nil -} - func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) { t.Helper() @@ -157,7 +90,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt } r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil { t.Fatal(err) } return cc, r, backends @@ -275,7 +208,8 @@ func (s) TestRoundRobin_OneServerDown(t *testing.T) { for i := 0; i < len(backends)-1; i++ { addrs[i] = resolver.Address{Addr: backends[i].Address} } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + client := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil { t.Fatalf("RPCs are not being round robined across remaining servers: %v", err) } } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index 6e96f2e31..f6f6249d9 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -48,7 +48,7 @@ import ( ) const ( - defaultTestTimeout = 1 * time.Second + defaultTestTimeout = 5 * time.Second defaultTestShortTimeout = 10 * time.Millisecond testEDSServcie = "test-eds-service-name" testClusterName = "test-cluster-name" diff --git a/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go new file mode 100644 index 000000000..6742675ed --- /dev/null +++ b/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go @@ -0,0 +1,587 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_test + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancergroup" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + rrutil "google.golang.org/grpc/internal/testutils/roundrobin" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds/internal/balancer/priority" + "google.golang.org/grpc/xds/internal/xdsclient" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + wrapperspb "github.com/golang/protobuf/ptypes/wrappers" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/xds/internal/balancer/clusterresolver" // Register the "cluster_resolver_experimental" LB policy. + _ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client. +) + +const ( + clusterName = "cluster-my-service-client-side-xds" + edsServiceName = "endpoints-my-service-client-side-xds" + localityName1 = "my-locality-1" + localityName2 = "my-locality-2" + localityName3 = "my-locality-3" + + defaultTestTimeout = 5 * time.Second +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// backendAddressesAndPorts extracts the address and port of each of the +// StubServers passed in and returns them. Fails the test if any of the +// StubServers passed have an invalid address. +func backendAddressesAndPorts(t *testing.T, servers []*stubserver.StubServer) ([]resolver.Address, []uint32) { + addrs := make([]resolver.Address, len(servers)) + ports := make([]uint32, len(servers)) + for i := 0; i < len(servers); i++ { + addrs[i] = resolver.Address{Addr: servers[i].Address} + ports[i] = extractPortFromAddress(t, servers[i].Address) + } + return addrs, ports +} + +func extractPortFromAddress(t *testing.T, address string) uint32 { + _, p, err := net.SplitHostPort(address) + if err != nil { + t.Fatalf("invalid server address %q: %v", address, err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("invalid server address %q: %v", address, err) + } + return uint32(port) +} + +func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.StubServer, func()) { + servers := make([]*stubserver.StubServer, numBackends) + for i := 0; i < numBackends; i++ { + servers[i] = &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + servers[i].StartServer() + } + + return servers, func() { + for _, server := range servers { + server.Stop() + } + } +} + +// endpointResource returns an EDS resource for the given cluster name and +// localities. Backends within a locality are all assumed to be on the same +// machine (localhost). +func endpointResource(clusterName string, localities []localityInfo) *v3endpointpb.ClusterLoadAssignment { + var localityEndpoints []*v3endpointpb.LocalityLbEndpoints + for _, locality := range localities { + var endpoints []*v3endpointpb.LbEndpoint + for i, port := range locality.ports { + endpoint := &v3endpointpb.LbEndpoint{ + HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{ + Endpoint: &v3endpointpb.Endpoint{ + Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{ + SocketAddress: &v3corepb.SocketAddress{ + Protocol: v3corepb.SocketAddress_TCP, + Address: "localhost", + PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}}, + }, + }, + }, + }, + } + if i < len(locality.healthStatus) { + endpoint.HealthStatus = locality.healthStatus[i] + } + endpoints = append(endpoints, endpoint) + } + localityEndpoints = append(localityEndpoints, &v3endpointpb.LocalityLbEndpoints{ + Locality: &v3corepb.Locality{SubZone: locality.name}, + LbEndpoints: endpoints, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.weight}, + }) + } + return &v3endpointpb.ClusterLoadAssignment{ + ClusterName: clusterName, + Endpoints: localityEndpoints, + } +} + +type localityInfo struct { + name string + weight uint32 + ports []uint32 + healthStatus []v3corepb.HealthStatus +} + +// clientEndpointsResource returns an EDS resource for the specified nodeID, +// service name and localities. +func clientEndpointsResource(nodeID, edsServiceName string, localities []localityInfo) e2e.UpdateOptions { + return e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpointResource(edsServiceName, localities)}, + SkipValidation: true, + } +} + +// TestEDS_OneLocality tests the cluster_resolver LB policy using an EDS +// resource with one locality. The following scenarios are tested: +// 1. Single backend. Test verifies that RPCs reach this backend. +// 2. Add a backend. Test verifies that RPCs are roundrobined across the two +// backends. +// 3. Remove one backend. Test verifies that all RPCs reach the other backend. +// 4. Replace the backend. Test verifies that all RPCs reach the new backend. +func (s) TestEDS_OneLocality(t *testing.T) { + // Spin up a management server to receive xDS resources from. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil) + defer cleanup1() + + // Start backend servers which provide an implementation of the TestService. + servers, cleanup2 := startTestServiceBackends(t, 3) + defer cleanup2() + addrs, ports := backendAddressesAndPorts(t, servers) + + // Create xDS resources for consumption by the test. We start off with a + // single backend in a single EDS locality. + resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Create a manual resolver and push a service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s" + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Ensure RPCs are being roundrobined across the single backend. + testClient := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil { + t.Fatal(err) + } + + // Add a backend to the same locality, and ensure RPCs are sent in a + // roundrobin fashion across the two backends. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:2]}}) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:2]); err != nil { + t.Fatal(err) + } + + // Remove the first backend, and ensure all RPCs are sent to the second + // backend. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[1:2]}}) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[1:2]); err != nil { + t.Fatal(err) + } + + // Replace the backend, and ensure all RPCs are sent to the new backend. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[2:3]}}) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[2:3]); err != nil { + t.Fatal(err) + } +} + +// TestEDS_MultipleLocalities tests the cluster_resolver LB policy using an EDS +// resource with multiple localities. The following scenarios are tested: +// 1. Two localities, each with a single backend. Test verifies that RPCs are +// weighted roundrobined across these two backends. +// 2. Add another locality, with a single backend. Test verifies that RPCs are +// weighted roundrobined across all the backends. +// 3. Remove one locality. Test verifies that RPCs are weighted roundrobined +// across backends from the remaining localities. +// 4. Add a backend to one locality. Test verifies that RPCs are weighted +// roundrobined across localities. +// 5. Change the weight of one of the localities. Test verifies that RPCs are +// weighted roundrobined across the localities. +// +// In our LB policy tree, one of the descendents of the "cluster_resolver" LB +// policy is the "weighted_target" LB policy which performs weighted roundrobin +// across localities (and this has a randomness component associated with it). +// Therefore, the moment we have backends from more than one locality, RPCs are +// weighted roundrobined across them. +func (s) TestEDS_MultipleLocalities(t *testing.T) { + // Spin up a management server to receive xDS resources from. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil) + defer cleanup1() + + // Start backend servers which provide an implementation of the TestService. + servers, cleanup2 := startTestServiceBackends(t, 4) + defer cleanup2() + addrs, ports := backendAddressesAndPorts(t, servers) + + // Create xDS resources for consumption by the test. We start off with two + // localities, and single backend in each of them. + resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName1, weight: 1, ports: ports[:1]}, + {name: localityName2, weight: 1, ports: ports[1:2]}, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Create a manual resolver and push service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s" + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Ensure RPCs are being weighted roundrobined across the two backends. + testClient := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:2]); err != nil { + t.Fatal(err) + } + + // Add another locality with a single backend, and ensure RPCs are being + // weighted roundrobined across the three backends. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName1, weight: 1, ports: ports[:1]}, + {name: localityName2, weight: 1, ports: ports[1:2]}, + {name: localityName3, weight: 1, ports: ports[2:3]}, + }) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:3]); err != nil { + t.Fatal(err) + } + + // Remove the first locality, and ensure RPCs are being weighted + // roundrobined across the remaining two backends. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName2, weight: 1, ports: ports[1:2]}, + {name: localityName3, weight: 1, ports: ports[2:3]}, + }) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[1:3]); err != nil { + t.Fatal(err) + } + + // Add a backend to one locality, and ensure weighted roundrobin. Since RPCs + // are roundrobined across localities, locality2's backend will receive + // twice the traffic. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName2, weight: 1, ports: ports[1:2]}, + {name: localityName3, weight: 1, ports: ports[2:4]}, + }) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantAddrs := []resolver.Address{addrs[1], addrs[1], addrs[2], addrs[3]} + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil { + t.Fatal(err) + } + + // Change the weight of locality2 and ensure weighted roundrobin. Since + // locality2 has twice the weight of locality3, it will be picked twice as + // frequently as locality3 for RPCs. And since locality2 has a single + // backend and locality3 has two backends, the backend in locality2 will + // receive four times the traffic of each of locality3's backends. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName2, weight: 2, ports: ports[1:2]}, + {name: localityName3, weight: 1, ports: ports[2:4]}, + }) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + wantAddrs = []resolver.Address{addrs[1], addrs[1], addrs[1], addrs[1], addrs[2], addrs[3]} + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil { + t.Fatal(err) + } +} + +// TestEDS_EndpointsHealth tests the cluster_resolver LB policy using an EDS +// resource which specifies endpoint health information and verifies that +// traffic is routed only to backends deemed capable of receiving traffic. +func (s) TestEDS_EndpointsHealth(t *testing.T) { + // Spin up a management server to receive xDS resources from. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil) + defer cleanup1() + + // Start backend servers which provide an implementation of the TestService. + servers, cleanup2 := startTestServiceBackends(t, 12) + defer cleanup2() + addrs, ports := backendAddressesAndPorts(t, servers) + + // Create xDS resources for consumption by the test. Two localities with + // six backends each, with two of the six backends being healthy. Both + // UNKNOWN and HEALTHY are considered by gRPC for load balancing. + resources := clientEndpointsResource(nodeID, edsServiceName, []localityInfo{ + {name: localityName1, weight: 1, ports: ports[:6], healthStatus: []v3corepb.HealthStatus{ + v3corepb.HealthStatus_UNKNOWN, + v3corepb.HealthStatus_HEALTHY, + v3corepb.HealthStatus_UNHEALTHY, + v3corepb.HealthStatus_DRAINING, + v3corepb.HealthStatus_TIMEOUT, + v3corepb.HealthStatus_DEGRADED, + }}, + {name: localityName2, weight: 1, ports: ports[6:12], healthStatus: []v3corepb.HealthStatus{ + v3corepb.HealthStatus_UNKNOWN, + v3corepb.HealthStatus_HEALTHY, + v3corepb.HealthStatus_UNHEALTHY, + v3corepb.HealthStatus_DRAINING, + v3corepb.HealthStatus_TIMEOUT, + v3corepb.HealthStatus_DEGRADED, + }}, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Create a manual resolver and push service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s" + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client)) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + // Ensure RPCs are being weighted roundrobined across healthy backends from + // both localities. + testClient := testpb.NewTestServiceClient(cc) + if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, append(addrs[0:2], addrs[6:8]...)); err != nil { + t.Fatal(err) + } +} + +// TestEDS_EmptyUpdate tests the cluster_resolver LB policy using an EDS +// resource with no localities and verifies that RPCs fail with "all priorities +// removed" error. +func (s) TestEDS_EmptyUpdate(t *testing.T) { + // Spin up a management server to receive xDS resources from. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, nil) + defer cleanup1() + + // Start backend servers which provide an implementation of the TestService. + servers, cleanup2 := startTestServiceBackends(t, 4) + defer cleanup2() + addrs, ports := backendAddressesAndPorts(t, servers) + + oldCacheTimeout := balancergroup.DefaultSubBalancerCloseTimeout + balancergroup.DefaultSubBalancerCloseTimeout = 100 * time.Microsecond + defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }() + + // Create xDS resources for consumption by the test. The first update is an + // empty update. This should put the channel in TRANSIENT_FAILURE. + resources := clientEndpointsResource(nodeID, edsServiceName, nil) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client for use by the cluster_resolver LB policy. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Create a manual resolver and push service config specifying the use of + // the cluster_resolver LB policy with a single discovery mechanism. + r := manual.NewBuilderWithScheme("whatever") + jsonSC := fmt.Sprintf(`{ + "loadBalancingConfig":[{ + "cluster_resolver_experimental":{ + "discoveryMechanisms": [{ + "cluster": "%s", + "type": "EDS", + "edsServiceName": "%s" + }], + "xdsLbPolicy":[{"round_robin":{}}] + } + }] + }`, clusterName, edsServiceName) + scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) + r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client)) + + // Create a ClientConn and ensure that RPCs fail with "all priorities + // removed" error. This is the expected error when the cluster_resolver LB + // policy receives an EDS update with no localities. + cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + testClient := testpb.NewTestServiceClient(cc) + if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil { + t.Fatal(err) + } + + // Add a locality with one backend and ensure RPCs are successful. + resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{{name: localityName1, weight: 1, ports: ports[:1]}}) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil { + t.Fatal(err) + } + + // Push another empty update and ensure that RPCs fail with "all priorities + // removed" error again. + resources = clientEndpointsResource(nodeID, edsServiceName, nil) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil { + t.Fatal(err) + } +} + +// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the +// TestServiceClient until they fail with an error which indicates that all +// priorities have been removed. A non-nil error is returned if the context +// expires before RPCs fail with the expected error. +func waitForAllPrioritiesRemovedError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error { + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Log("EmptyCall() succeeded after EDS update with no localities") + continue + } + if code := status.Code(err); code != codes.Unavailable { + t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable) + continue + } + if !strings.Contains(err.Error(), priority.ErrAllPrioritiesRemoved.Error()) { + t.Logf("EmptyCall() = %v, want %v", err, priority.ErrAllPrioritiesRemoved) + continue + } + return nil + } + return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and priority.ErrAllPrioritiesRemoved error") +} diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go deleted file mode 100644 index 2621b7914..000000000 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ /dev/null @@ -1,569 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package clusterresolver - -import ( - "context" - "fmt" - "sort" - "testing" - "time" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/weightedtarget" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/internal/balancergroup" - internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/balancer/clusterimpl" - "google.golang.org/grpc/xds/internal/balancer/priority" - xdstestutils "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" -) - -var ( - testClusterNames = []string{"test-cluster-1", "test-cluster-2"} - testSubZones = []string{"I", "II", "III", "IV"} - testEndpointAddrs []string -) - -const testBackendAddrsCount = 12 - -func init() { - for i := 0; i < testBackendAddrsCount; i++ { - testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) - } - balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond - clusterimpl.NewRandomWRR = testutils.NewTestWRR - weightedtarget.NewRandomWRR = testutils.NewTestWRR - balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100 -} - -func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) { - xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) - cc := testutils.NewTestClientConn(t) - builder := balancer.Get(Name) - edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) - if edsb == nil { - t.Fatalf("builder.Build(%s) failed and returned nil", Name) - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), - BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []DiscoveryMechanism{{ - Cluster: testClusterName, - Type: DiscoveryMechanismTypeEDS, - }}, - }, - }); err != nil { - edsb.Close() - xdsC.Close() - t.Fatal(err) - } - if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { - edsb.Close() - xdsC.Close() - t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) - } - return edsb, cc, xdsC, func() { - edsb.Close() - xdsC.Close() - } -} - -// One locality -// - add backend -// - remove backend -// - replace backend -// - change drop rate -func (s) TestEDS_OneLocality(t *testing.T) { - edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) - defer cleanup() - - // One locality with one backend. - clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - - sc1 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Pick with only the first backend. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil { - t.Fatal(err) - } - - // The same locality, add one more backend. - clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) - - sc2 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test roundrobin with two subconns. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2}); err != nil { - t.Fatal(err) - } - - // The same locality, delete first backend. - clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) - - scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) - } - edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) - - // Test pick with only the second subconn. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil { - t.Fatal(err) - } - - // The same locality, replace backend. - clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil) - - sc3 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - scToRemove = <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove) - } - edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) - - // Test pick with only the third subconn. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil { - t.Fatal(err) - } - - // The same locality, different drop rate, dropping 50%. - clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], map[string]uint32{"test-drop": 50}) - clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil) - - // Picks with drops. - if err := testPickerFromCh(cc.NewPickerCh, func(p balancer.Picker) error { - for i := 0; i < 100; i++ { - _, err := p.Pick(balancer.PickInfo{}) - // TODO: the dropping algorithm needs a design. When the dropping algorithm - // is fixed, this test also needs fix. - if i%2 == 0 && err == nil { - return fmt.Errorf("%d - the even number picks should be drops, got error ", i) - } else if i%2 != 0 && err != nil { - return fmt.Errorf("%d - the odd number picks should be non-drops, got error %v", i, err) - } - } - return nil - }); err != nil { - t.Fatal(err) - } - - // The same locality, remove drops. - clab6 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab6.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab6.Build()), nil) - - // Pick without drops. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc3}); err != nil { - t.Fatal(err) - } -} - -// 2 locality -// - start with 2 locality -// - add locality -// - remove locality -// - address change for the locality -// - update locality weight -func (s) TestEDS_TwoLocalities(t *testing.T) { - edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) - defer cleanup() - - // Two localities, each with one backend. - clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - sc1 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Add the second locality later to make sure sc2 belongs to the second - // locality. Otherwise the test is flaky because of a map is used in EDS to - // keep localities. - clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - sc2 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test roundrobin with two subconns. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2}); err != nil { - t.Fatal(err) - } - - // Add another locality, with one backend. - clab2 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) - - sc3 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc3, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test roundrobin with three subconns. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1, sc2, sc3}); err != nil { - t.Fatal(err) - } - - // Remove first locality. - clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) - - scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) - } - edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) - - // Test pick with two subconns (without the first one). - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc3}); err != nil { - t.Fatal(err) - } - - // Add a backend to the last locality. - clab4 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab4.Build()), nil) - - sc4 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc4, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Test pick with two subconns (without the first one). - // - // Locality-1 will be picked twice, and locality-2 will be picked twice. - // Locality-1 contains only sc2, locality-2 contains sc3 and sc4. So expect - // two sc2's and sc3, sc4. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc3, sc4}); err != nil { - t.Fatal(err) - } - - // Change weight of the locality[1]. - clab5 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil) - clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab5.Build()), nil) - - // Test pick with two subconns different locality weight. - // - // Locality-1 will be picked four times, and locality-2 will be picked twice - // (weight 2 and 1). Locality-1 contains only sc2, locality-2 contains sc3 and - // sc4. So expect four sc2's and sc3, sc4. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2, sc2, sc2, sc2, sc3, sc4}); err != nil { - t.Fatal(err) - } -} - -// The EDS balancer gets EDS resp with unhealthy endpoints. Test that only -// healthy ones are used. -func (s) TestEDS_EndpointsHealth(t *testing.T) { - edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) - defer cleanup() - - // Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown. - clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &xdstestutils.AddLocalityOptions{ - Health: []corepb.HealthStatus{ - corepb.HealthStatus_HEALTHY, - corepb.HealthStatus_UNHEALTHY, - corepb.HealthStatus_UNKNOWN, - corepb.HealthStatus_DRAINING, - corepb.HealthStatus_TIMEOUT, - corepb.HealthStatus_DEGRADED, - }, - }) - clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &xdstestutils.AddLocalityOptions{ - Health: []corepb.HealthStatus{ - corepb.HealthStatus_HEALTHY, - corepb.HealthStatus_UNHEALTHY, - corepb.HealthStatus_UNKNOWN, - corepb.HealthStatus_DRAINING, - corepb.HealthStatus_TIMEOUT, - corepb.HealthStatus_DEGRADED, - }, - }) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - - var ( - readySCs []balancer.SubConn - newSubConnAddrStrs []string - ) - for i := 0; i < 4; i++ { - addr := <-cc.NewSubConnAddrsCh - newSubConnAddrStrs = append(newSubConnAddrStrs, addr[0].Addr) - sc := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - readySCs = append(readySCs, sc) - } - - wantNewSubConnAddrStrs := []string{ - testEndpointAddrs[0], - testEndpointAddrs[2], - testEndpointAddrs[6], - testEndpointAddrs[8], - } - sortStrTrans := cmp.Transformer("Sort", func(in []string) []string { - out := append([]string(nil), in...) // Copy input to avoid mutating it. - sort.Strings(out) - return out - }) - if !cmp.Equal(newSubConnAddrStrs, wantNewSubConnAddrStrs, sortStrTrans) { - t.Fatalf("want newSubConn with address %v, got %v", wantNewSubConnAddrStrs, newSubConnAddrStrs) - } - - // There should be exactly 4 new SubConns. Check to make sure there's no - // more subconns being created. - select { - case <-cc.NewSubConnCh: - t.Fatalf("Got unexpected new subconn") - case <-time.After(time.Microsecond * 100): - } - - // Test roundrobin with the subconns. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, readySCs); err != nil { - t.Fatal(err) - } -} - -// TestEDS_EmptyUpdate covers the cases when eds impl receives an empty update. -// -// It should send an error picker with transient failure to the parent. -func (s) TestEDS_EmptyUpdate(t *testing.T) { - edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) - defer cleanup() - - const cacheTimeout = 100 * time.Microsecond - oldCacheTimeout := balancergroup.DefaultSubBalancerCloseTimeout - balancergroup.DefaultSubBalancerCloseTimeout = cacheTimeout - defer func() { balancergroup.DefaultSubBalancerCloseTimeout = oldCacheTimeout }() - - // The first update is an empty update. - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil) - // Pick should fail with transient failure, and all priority removed error. - if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil { - t.Fatal(err) - } - - // One locality with one backend. - clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - - sc1 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Pick with only the first backend. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil { - t.Fatal(err) - } - - xdsC.InvokeWatchEDSCallback("", xdsresource.EndpointsUpdate{}, nil) - // Pick should fail with transient failure, and all priority removed error. - if err := testErrPickerFromCh(cc.NewPickerCh, priority.ErrAllPrioritiesRemoved); err != nil { - t.Fatal(err) - } - - // Wait for the old SubConn to be removed (which happens when the child - // policy is closed), so a new update would trigger a new SubConn (we need - // this new SubConn to tell if the next picker is newly created). - scToRemove := <-cc.RemoveSubConnCh - if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) - } - edsb.UpdateSubConnState(scToRemove, balancer.SubConnState{ConnectivityState: connectivity.Shutdown}) - - // Handle another update with priorities and localities. - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - - sc2 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Pick with only the first backend. - if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc2}); err != nil { - t.Fatal(err) - } -} - -func (s) TestEDS_CircuitBreaking(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - - edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) - defer cleanup() - - var maxRequests uint32 = 50 - if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []DiscoveryMechanism{{ - Cluster: testClusterName, - MaxConcurrentRequests: &maxRequests, - Type: DiscoveryMechanismTypeEDS, - }}, - }, - }); err != nil { - t.Fatal(err) - } - - // One locality with one backend. - clab1 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil) - sc1 := <-cc.NewSubConnCh - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - - // Picks with drops. - if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { - dones := []func(){} - defer func() { - for _, f := range dones { - f() - } - }() - - for i := 0; i < 100; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if pr.Done != nil { - dones = append(dones, func() { - pr.Done(balancer.DoneInfo{}) - }) - } - - if i < 50 && err != nil { - return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err) - } else if i > 50 && err == nil { - return fmt.Errorf("The second 50%% picks should be drops, got error ") - } - } - - for _, done := range dones { - done() - } - dones = []func(){} - - // Pick without drops. - for i := 0; i < 50; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if pr.Done != nil { - dones = append(dones, func() { - pr.Done(balancer.DoneInfo{}) - }) - } - - if err != nil { - return fmt.Errorf("The third 50%% picks should be non-drops, got error %v", err) - } - } - - return nil - }); err != nil { - t.Fatal(err.Error()) - } - - // Send another update, with only circuit breaking update (and no picker - // update afterwards). Make sure the new picker uses the new configs. - var maxRequests2 uint32 = 10 - if err := edsb.UpdateClientConnState(balancer.ClientConnState{ - BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []DiscoveryMechanism{{ - Cluster: testClusterName, - MaxConcurrentRequests: &maxRequests2, - Type: DiscoveryMechanismTypeEDS, - }}, - }, - }); err != nil { - t.Fatal(err) - } - - // Picks with drops. - if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { - dones := []func(){} - defer func() { - for _, f := range dones { - f() - } - }() - - for i := 0; i < 100; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if pr.Done != nil { - dones = append(dones, func() { - pr.Done(balancer.DoneInfo{}) - }) - } - if i < 10 && err != nil { - return fmt.Errorf("The first 10%% picks should be non-drops, got error %v", err) - } else if i > 10 && err == nil { - return fmt.Errorf("The next 90%% picks should be drops, got error ") - } - } - - for _, done := range dones { - done() - } - dones = []func(){} - - // Pick without drops. - for i := 0; i < 10; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if pr.Done != nil { - dones = append(dones, func() { - pr.Done(balancer.DoneInfo{}) - }) - } - - if err != nil { - return fmt.Errorf("The next 10%% picks should be non-drops, got error %v", err) - } - } - return nil - }); err != nil { - t.Fatal(err.Error()) - } -} diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index 4c8f0b57c..b08b82089 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -19,19 +19,78 @@ package clusterresolver import ( "context" + "fmt" "testing" "time" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/weightedtarget" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/balancergroup" + internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/balancer/clusterimpl" "google.golang.org/grpc/xds/internal/balancer/priority" xdstestutils "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" + "google.golang.org/grpc/xds/internal/xdsclient" ) +var ( + testClusterNames = []string{"test-cluster-1", "test-cluster-2"} + testSubZones = []string{"I", "II", "III", "IV"} + testEndpointAddrs []string +) + +const testBackendAddrsCount = 12 + +func init() { + for i := 0; i < testBackendAddrsCount; i++ { + testEndpointAddrs = append(testEndpointAddrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) + } + balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond + clusterimpl.NewRandomWRR = testutils.NewTestWRR + weightedtarget.NewRandomWRR = testutils.NewTestWRR + balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100 +} + +func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) { + xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar) + cc := testutils.NewTestClientConn(t) + builder := balancer.Get(Name) + edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}}) + if edsb == nil { + t.Fatalf("builder.Build(%s) failed and returned nil", Name) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := edsb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), + BalancerConfig: &LBConfig{ + DiscoveryMechanisms: []DiscoveryMechanism{{ + Cluster: testClusterName, + Type: DiscoveryMechanismTypeEDS, + }}, + }, + }); err != nil { + edsb.Close() + xdsC.Close() + t.Fatal(err) + } + if _, err := xdsC.WaitForWatchEDS(ctx); err != nil { + edsb.Close() + xdsC.Close() + t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err) + } + return edsb, cc, xdsC, func() { + edsb.Close() + xdsC.Close() + } +} + // When a high priority is ready, adding/removing lower locality doesn't cause // changes. //