grpc-go/balancer/leastrequest/leastrequest_test.go

771 lines
25 KiB
Go

/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package leastrequest
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestParseConfig(t *testing.T) {
parser := bb{}
tests := []struct {
name string
input string
wantCfg serviceconfig.LoadBalancingConfig
wantErr string
}{
{
name: "happy-case-default",
input: `{}`,
wantCfg: &LBConfig{
ChoiceCount: 2,
},
},
{
name: "happy-case-choice-count-set",
input: `{"choiceCount": 3}`,
wantCfg: &LBConfig{
ChoiceCount: 3,
},
},
{
name: "happy-case-choice-count-greater-than-ten",
input: `{"choiceCount": 11}`,
wantCfg: &LBConfig{
ChoiceCount: 10,
},
},
{
name: "choice-count-less-than-2",
input: `{"choiceCount": 1}`,
wantErr: "must be >= 2",
},
{
name: "invalid-json",
input: "{{invalidjson{{",
wantErr: "invalid character",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
// Substring match makes this very tightly coupled to the
// internalserviceconfig.BalancerConfig error strings. However, it
// is important to distinguish the different types of error messages
// possible as the parser has a few defined buckets of ways it can
// error out.
if (gotErr != nil) != (test.wantErr != "") {
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
}
if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
}
if test.wantErr != "" {
return
}
if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
t.Fatalf("ParseConfig(%v) got unexpected output, diff (-got +want): %v", test.input, diff)
}
})
}
}
func startBackends(t *testing.T, numBackends int) []*stubserver.StubServer {
backends := make([]*stubserver.StubServer, 0, numBackends)
// Construct and start working backends.
for i := 0; i < numBackends; i++ {
backend := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
<-stream.Context().Done()
return nil
},
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started good TestService backend at: %q", backend.Address)
t.Cleanup(func() { backend.Stop() })
backends = append(backends, backend)
}
return backends
}
// setupBackends spins up three test backends, each listening on a port on
// localhost. The three backends always reply with an empty response with no
// error, and for streaming receive until hitting an EOF error.
func setupBackends(t *testing.T, numBackends int) []string {
t.Helper()
addresses := make([]string, numBackends)
backends := startBackends(t, numBackends)
// Construct and start working backends.
for i := 0; i < numBackends; i++ {
addresses[i] = backends[i].Address
}
return addresses
}
// checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
// connected to a server exposing the test.grpc_testing.TestService, are
// roundrobined across the given backend addresses.
//
// Returns a non-nil error if context deadline expires before RPCs start to get
// roundrobined across the given backends.
func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
wantAddrCount := make(map[string]int)
for _, addr := range addrs {
wantAddrCount[addr.Addr]++
}
gotAddrCount := make(map[string]int)
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
gotAddrCount = make(map[string]int)
// Perform 3 iterations.
var iterations [][]string
for i := 0; i < 3; i++ {
iteration := make([]string, len(addrs))
for c := 0; c < len(addrs); c++ {
var peer peer.Peer
client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer))
iteration[c] = peer.Addr.String()
}
iterations = append(iterations, iteration)
}
// Ensure the first iteration contains all addresses in addrs.
for _, addr := range iterations[0] {
gotAddrCount[addr]++
}
if !cmp.Equal(gotAddrCount, wantAddrCount) {
continue
}
// Ensure all three iterations contain the same addresses.
if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
continue
}
return nil
}
return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v; got: %v", addrs, gotAddrCount)
}
// TestLeastRequestE2E tests the Least Request LB policy in an e2e style. The
// Least Request balancer is configured as the top level balancer of the
// channel, and is passed three addresses. Eventually, the test creates three
// streams, which should be on certain backends according to the least request
// algorithm. The randomness in the picker is injected in the test to be
// deterministic, allowing the test to make assertions on the distribution.
func (s) TestLeastRequestE2E(t *testing.T) {
defer func(u func() uint32) {
randuint32 = u
}(randuint32)
var index int
indexes := []uint32{
0, 0, 1, 1, 2, 2, // Triggers a round robin distribution.
}
randuint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
}
addresses := setupBackends(t, 3)
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
// Configure least request as top level balancer of channel.
lrscJSON := `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 2
}
}
]
}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
firstThreeAddresses := []resolver.Address{
{Addr: addresses[0]},
{Addr: addresses[1]},
{Addr: addresses[2]},
}
mr.InitialState(resolver.State{
Addresses: firstThreeAddresses,
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
// Wait for all 3 backends to round robin across. The happens because a
// SubConn transitioning into READY causes a new picker update. Once the
// picker update with all 3 backends is present, this test can start to make
// assertions based on those backends.
if err := checkRoundRobinRPCs(ctx, testServiceClient, firstThreeAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Map ordering of READY SubConns is non deterministic. Thus, perform 3 RPCs
// mocked from the random to each index to learn the addresses of SubConns
// at each index.
index = 0
peerAtIndex := make([]string, 3)
var peer0 peer.Peer
if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
}
peerAtIndex[0] = peer0.Addr.String()
if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
}
peerAtIndex[1] = peer0.Addr.String()
if _, err := testServiceClient.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer0)); err != nil {
t.Fatalf("testServiceClient.EmptyCall failed: %v", err)
}
peerAtIndex[2] = peer0.Addr.String()
// Start streaming RPCs, but do not finish them. Each subsequent stream
// should be started according to the least request algorithm, and chosen
// between the indexes provided.
index = 0
indexes = []uint32{
0, 0, // Causes first stream to be on first address.
0, 1, // Compares first address (one RPC) to second (no RPCs), so choose second.
1, 2, // Compares second address (one RPC) to third (no RPCs), so choose third.
0, 3, // Causes another stream on first address.
1, 0, // Compares second address (one RPC) to first (two RPCs), so choose second.
2, 0, // Compares third address (one RPC) to first (two RPCs), so choose third.
0, 0, // Causes another stream on first address.
2, 2, // Causes a stream on third address.
2, 1, // Compares third address (three RPCs) to second (two RPCs), so choose third.
}
wantIndex := []uint32{0, 1, 2, 0, 1, 2, 0, 2, 1}
// Start streaming RPC's, but do not finish them. Each created stream should
// be started based on the least request algorithm and injected randomness
// (see indexes slice above for exact expectations).
for _, wantIndex := range wantIndex {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr.String() != peerAtIndex[wantIndex] {
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), peerAtIndex[wantIndex])
}
}
}
// TestLeastRequestPersistsCounts tests that the Least Request Balancer persists
// counts once it gets a new picker update. It first updates the Least Request
// Balancer with two backends, and creates a bunch of streams on them. Then, it
// updates the Least Request Balancer with three backends, including the two
// previous. Any created streams should then be started on the new backend.
func (s) TestLeastRequestPersistsCounts(t *testing.T) {
defer func(u func() uint32) {
randuint32 = u
}(randuint32)
var index int
indexes := []uint32{
0, 0, 1, 1,
}
randuint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
}
addresses := setupBackends(t, 3)
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
// Configure least request as top level balancer of channel.
lrscJSON := `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 2
}
}
]
}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
firstTwoAddresses := []resolver.Address{
{Addr: addresses[0]},
{Addr: addresses[1]},
}
mr.InitialState(resolver.State{
Addresses: firstTwoAddresses,
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
// Wait for the two backends to round robin across. The happens because a
// SubConn transitioning into READY causes a new picker update. Once the
// picker update with the two backends is present, this test can start to
// populate those backends with streams.
if err := checkRoundRobinRPCs(ctx, testServiceClient, firstTwoAddresses); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Start 50 streaming RPCs, and leave them unfinished for the duration of
// the test. This will populate the first two addresses with many active
// RPCs.
for i := 0; i < 50; i++ {
_, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
}
// Update the least request balancer to choice count 3. Also update the
// address list adding a third address. Alongside the injected randomness,
// this should trigger the least request balancer to search all created
// SubConns. Thus, since address 3 is the new address and the first two
// addresses are populated with RPCs, once the picker update of all 3 READY
// SubConns takes effect, all new streams should be started on address 3.
index = 0
indexes = []uint32{
0, 1, 2, 3, 4, 5,
}
lrscJSON = `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 3
}
}
]
}`
sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
fullAddresses := []resolver.Address{
{Addr: addresses[0]},
{Addr: addresses[1]},
{Addr: addresses[2]},
}
mr.UpdateState(resolver.State{
Addresses: fullAddresses,
ServiceConfig: sc,
})
newAddress := fullAddresses[2]
// Poll for only address 3 to show up. This requires a polling loop because
// picker update with all three SubConns doesn't take into effect
// immediately, needs the third SubConn to become READY.
if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Start 25 rpcs, but don't finish them. They should all start on address 3,
// since the first two addresses both have 25 RPCs (and randomness
// injection/choiceCount causes all 3 to be compared every iteration).
for i := 0; i < 25; i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr.String() != addresses[2] {
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), addresses[2])
}
}
// Now 25 RPC's are active on each address, the next three RPC's should
// round robin, since choiceCount is three and the injected random indexes
// cause it to search all three addresses for fewest outstanding requests on
// each iteration.
wantAddrCount := map[string]int{
addresses[0]: 1,
addresses[1]: 1,
addresses[2]: 1,
}
gotAddrCount := make(map[string]int)
for i := 0; i < len(addresses); i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr != nil {
gotAddrCount[p.Addr.String()]++
}
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
t.Fatalf("addr count (-got:, +want): %v", diff)
}
}
// TestConcurrentRPCs tests concurrent RPCs on the least request balancer. It
// configures a channel with a least request balancer as the top level balancer,
// and makes 100 RPCs asynchronously. This makes sure no race conditions happen
// in this scenario.
func (s) TestConcurrentRPCs(t *testing.T) {
addresses := setupBackends(t, 3)
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
// Configure least request as top level balancer of channel.
lrscJSON := `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 2
}
}
]
}`
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
firstTwoAddresses := []resolver.Address{
{Addr: addresses[0]},
{Addr: addresses[1]},
}
mr.InitialState(resolver.State{
Addresses: firstTwoAddresses,
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 5; j++ {
testServiceClient.EmptyCall(ctx, &testpb.Empty{})
}
}()
}
wg.Wait()
}
// Test tests that the least request balancer persists RPC counts once it gets
// new picker updates and backends within an endpoint go down. It first updates
// the balancer with two endpoints having two addresses each. It verifies the
// requests are round robined across the first address of each endpoint. It then
// stops the active backend in endpoint[0]. It verified that the balancer starts
// using the second address in endpoint[0]. The test then creates a bunch of
// streams on two endpoints. Then, it updates the balancer with three endpoints,
// including the two previous. Any created streams should then be started on the
// new endpoint. The test shuts down the active backed in endpoint[1] and
// endpoint[2]. The test verifies that new RPCs are round robined across the
// active backends in endpoint[1] and endpoint[2].
func (s) TestLeastRequestEndpoints_MultipleAddresses(t *testing.T) {
defer func(u func() uint32) {
randuint32 = u
}(randuint32)
var index int
indexes := []uint32{
0, 0, 1, 1,
}
randuint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
}
backends := startBackends(t, 6)
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
// Configure least request as top level balancer of channel.
lrscJSON := `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 2
}
}
]
}`
endpoints := []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backends[0].Address}, {Addr: backends[1].Address}}},
{Addresses: []resolver.Address{{Addr: backends[2].Address}, {Addr: backends[3].Address}}},
{Addresses: []resolver.Address{{Addr: backends[4].Address}, {Addr: backends[5].Address}}},
}
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
firstTwoEndpoints := []resolver.Endpoint{endpoints[0], endpoints[1]}
mr.InitialState(resolver.State{
Endpoints: firstTwoEndpoints,
ServiceConfig: sc,
})
cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
// Wait for the two backends to round robin across. The happens because a
// child pickfirst transitioning into READY causes a new picker update. Once
// the picker update with the two backends is present, this test can start
// to populate those backends with streams.
wantAddrs := []resolver.Address{
endpoints[0].Addresses[0],
endpoints[1].Addresses[0],
}
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Shut down one of the addresses in endpoints[0], the child pickfirst
// should fallback to the next address in endpoints[0].
backends[0].Stop()
wantAddrs = []resolver.Address{
endpoints[0].Addresses[1],
endpoints[1].Addresses[0],
}
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Start 50 streaming RPCs, and leave them unfinished for the duration of
// the test. This will populate the first two endpoints with many active
// RPCs.
for i := 0; i < 50; i++ {
_, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
}
// Update the least request balancer to choice count 3. Also update the
// address list adding a third endpoint. Alongside the injected randomness,
// this should trigger the least request balancer to search all created
// endpoints. Thus, since endpoint 3 is the new endpoint and the first two
// endpoint are populated with RPCs, once the picker update of all 3 READY
// pickfirsts takes effect, all new streams should be started on endpoint 3.
index = 0
indexes = []uint32{
0, 1, 2, 3, 4, 5,
}
lrscJSON = `
{
"loadBalancingConfig": [
{
"least_request_experimental": {
"choiceCount": 3
}
}
]
}`
sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lrscJSON)
mr.UpdateState(resolver.State{
Endpoints: endpoints,
ServiceConfig: sc,
})
newAddress := endpoints[2].Addresses[0]
// Poll for only endpoint 3 to show up. This requires a polling loop because
// picker update with all three endpoints doesn't take into effect
// immediately, needs the third pickfirst to become READY.
if err := checkRoundRobinRPCs(ctx, testServiceClient, []resolver.Address{newAddress}); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
// Start 25 rpcs, but don't finish them. They should all start on endpoint 3,
// since the first two endpoints both have 25 RPCs (and randomness
// injection/choiceCount causes all 3 to be compared every iteration).
for i := 0; i < 25; i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr.String() != newAddress.Addr {
t.Fatalf("testServiceClient.FullDuplexCall's Peer got: %v, want: %v", p.Addr.String(), newAddress)
}
}
// Now 25 RPC's are active on each endpoint, the next three RPC's should
// round robin, since choiceCount is three and the injected random indexes
// cause it to search all three endpoints for fewest outstanding requests on
// each iteration.
wantAddrCount := map[string]int{
endpoints[0].Addresses[1].Addr: 1,
endpoints[1].Addresses[0].Addr: 1,
endpoints[2].Addresses[0].Addr: 1,
}
gotAddrCount := make(map[string]int)
for i := 0; i < len(endpoints); i++ {
stream, err := testServiceClient.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("testServiceClient.FullDuplexCall failed: %v", err)
}
p, ok := peer.FromContext(stream.Context())
if !ok {
t.Fatalf("testServiceClient.FullDuplexCall has no Peer")
}
if p.Addr != nil {
gotAddrCount[p.Addr.String()]++
}
}
if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
t.Fatalf("addr count (-got:, +want): %v", diff)
}
// Shutdown the active address for endpoint[1] and endpoint[2]. This should
// result in their streams failing. Now the requests should roundrobin b/w
// endpoint[1] and endpoint[2].
backends[2].Stop()
backends[4].Stop()
index = 0
indexes = []uint32{
0, 1, 2, 2, 1, 0,
}
wantAddrs = []resolver.Address{
endpoints[1].Addresses[1],
endpoints[2].Addresses[1],
}
if err := checkRoundRobinRPCs(ctx, testServiceClient, wantAddrs); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
}
// Test tests that the least request balancer properly surfaces resolver
// errors.
func (s) TestLeastRequestEndpoints_ResolverError(t *testing.T) {
const sc = `{"loadBalancingConfig": [{"least_request_experimental": {}}]}`
mr := manual.NewBuilderWithScheme("lr-e2e")
defer mr.Close()
cc, err := grpc.NewClient(
mr.Scheme()+":///",
grpc.WithResolvers(mr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(sc),
)
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
defer cc.Close()
// We need to pass an endpoint with a valid address to the resolver before
// reporting an error - otherwise endpointsharding does not report the
// error through.
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
// Act like a server that closes the connection without sending a server
// preface.
go func() {
conn, err := lis.Accept()
if err != nil {
t.Errorf("Unexpected error when accepting a connection: %v", err)
}
conn.Close()
}()
mr.UpdateState(resolver.State{
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}},
})
cc.Connect()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
// Report an error through the resolver
resolverErr := fmt.Errorf("simulated resolver error")
mr.CC().ReportError(resolverErr)
// Ensure the client returns the expected resolver error.
testServiceClient := testgrpc.NewTestServiceClient(cc)
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
_, err = testServiceClient.EmptyCall(ctx, &testpb.Empty{})
if strings.Contains(err.Error(), resolverErr.Error()) {
break
}
}
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for RPCs to fail with error containing %s. Last error: %v", resolverErr, err)
}
}