test: cleanup balancer switching tests (#5271)

This commit is contained in:
Easwar Swaminathan 2022-03-30 12:58:41 -07:00 committed by GitHub
parent b6873c006d
commit 42cadc171d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 945 additions and 909 deletions

View File

@ -1,569 +0,0 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"fmt"
"math"
"testing"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
)
var _ balancer.Builder = &magicalLB{}
var _ balancer.Balancer = &magicalLB{}
// magicalLB is a ringer for grpclb. It is used to avoid circular dependencies on the grpclb package
type magicalLB struct{}
func (b *magicalLB) Name() string {
return "grpclb"
}
func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return b
}
func (b *magicalLB) ResolverError(error) {}
func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
return nil
}
func (b *magicalLB) Close() {}
func (b *magicalLB) ExitIdle() {}
func init() {
balancer.Register(&magicalLB{})
}
func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
var servers []*server
for i := 0; i < numServers; i++ {
s := newTestServer()
servers = append(servers, s)
go s.start(t, 0, maxStreams)
s.wait(t, 2*time.Second)
}
return servers, func() {
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
}
func errorDesc(err error) string {
if s, ok := status.FromError(err); ok {
return s.Message()
}
return err.Error()
}
func checkPickFirst(cc *ClientConn, servers []*server) error {
var (
req = "port"
reply string
err error
)
connected := false
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < 5000; i++ {
if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
if connected {
// connected is set to false if peer is not server[0]. So if
// connected is true here, this is the second time we saw
// server[0] in a row. Break because pickfirst is in effect.
break
}
connected = true
} else {
connected = false
}
time.Sleep(time.Millisecond)
}
if !connected {
return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port)
}
// The following RPCs should all succeed with the first server.
for i := 0; i < 3; i++ {
err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
if errorDesc(err) != servers[0].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
}
}
return nil
}
func checkRoundRobin(cc *ClientConn, servers []*server) error {
var (
req = "port"
reply string
err error
)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Make sure connections to all servers are up.
for i := 0; i < 2; i++ {
// Do this check twice, otherwise the first RPC's transport may still be
// picked by the closing pickfirst balancer, and the test becomes flaky.
for _, s := range servers {
var up bool
for i := 0; i < 5000; i++ {
if err = cc.Invoke(ctx, "/foo/bar", &req, &reply); errorDesc(err) == s.port {
up = true
break
}
time.Sleep(time.Millisecond)
}
if !up {
return fmt.Errorf("server %v is not up within 5 second", s.port)
}
}
}
serverCount := len(servers)
for i := 0; i < 3*serverCount; i++ {
err = cc.Invoke(ctx, "/foo/bar", &req, &reply)
if errorDesc(err) != servers[i%serverCount].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
}
}
return nil
}
func (s) TestSwitchBalancer(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
}
// First addr update contains grpclb.
func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
// New update containing new backend and new grpclb. Should not switch
// balancer.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
for i := 0; i < 200; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if !isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
}
var isPickFirst bool
// Switch balancer to pickfirst.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
}
time.Sleep(time.Millisecond)
}
if !isPickFirst {
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
}
}
// First addr update does not contain grpclb.
func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
}
time.Sleep(time.Millisecond)
}
if !isPickFirst {
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
}
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
// New update containing new backend and new grpclb. Should not switch
// balancer.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}})
for i := 0; i < 200; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if !isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb")
}
// Switch balancer back.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
}
time.Sleep(time.Millisecond)
}
if !isPickFirst {
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
}
}
// Test that if the current balancer is roundrobin, after switching to grpclb,
// when the resolved address doesn't contain grpclb addresses, balancer will be
// switched back to roundrobin.
func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
cc.mu.Unlock()
if isRoundRobin {
break
}
time.Sleep(time.Millisecond)
}
if !isRoundRobin {
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
}
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}, ServiceConfig: sc})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
// Switch balancer back.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
cc.mu.Unlock()
if isRoundRobin {
break
}
time.Sleep(time.Millisecond)
}
if !isRoundRobin {
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
}
}
// Test that if resolved address list contains grpclb, the balancer option in
// service config won't take effect. But when there's no grpclb address in a new
// resolved address list, balancer will be switched to the new one.
func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}})
var isPickFirst bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isPickFirst = cc.curBalancerName == PickFirstBalancerName
cc.mu.Unlock()
if isPickFirst {
break
}
time.Sleep(time.Millisecond)
}
if !isPickFirst {
t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName)
}
// ClientConn will switch balancer to grpclb when receives an address of
// type GRPCLB.
addrs := []resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}
r.UpdateState(resolver.State{Addresses: addrs})
var isGRPCLB bool
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isGRPCLB = cc.curBalancerName == "grpclb"
cc.mu.Unlock()
if isGRPCLB {
break
}
time.Sleep(time.Millisecond)
}
if !isGRPCLB {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}
sc := parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 200; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
cc.mu.Unlock()
if isRoundRobin {
break
}
time.Sleep(time.Millisecond)
}
// Balancer should NOT switch to round_robin because resolved list contains
// grpclb.
if isRoundRobin {
t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb")
}
// Switch balancer back.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
for i := 0; i < 5000; i++ {
cc.mu.Lock()
isRoundRobin = cc.curBalancerName == "round_robin"
cc.mu.Unlock()
if isRoundRobin {
break
}
time.Sleep(time.Millisecond)
}
if !isRoundRobin {
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
}
}
// Test that when switching to grpclb fails because grpclb is not registered,
// the fallback balancer will only get backend addresses, not the grpclb server
// address.
//
// The tests sends 3 server addresses (all backends) as resolved addresses, but
// claim the first one is grpclb server. The all RPCs should all be send to the
// other addresses, not the first one.
func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
internal.BalancerUnregister("grpclb")
defer balancer.Register(&magicalLB{})
r := manual.NewBuilderWithScheme("whatever")
const numServers = 3
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Try switching to grpclb by sending servers[0] as grpclb address. It's
// expected that servers[0] will be filtered out, so it will not be used by
// the balancer.
//
// If the filtering failed, servers[0] will be used for RPCs and the RPCs
// will succeed. The following checks will catch this and fail.
addrs := []resolver.Address{
{Addr: servers[0].addr, Type: resolver.GRPCLB},
{Addr: servers[1].addr}, {Addr: servers[2].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// Still check for pickfirst, but only with server[1] and server[2].
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, and check against server[1] and server[2].
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs}, nil)
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}
const inlineRemoveSubConnBalancerName = "test-inline-remove-subconn-balancer"
func init() {
stub.Register(inlineRemoveSubConnBalancerName, stub.BalancerFuncs{
Close: func(data *stub.BalancerData) {
data.ClientConn.RemoveSubConn(&acBalancerWrapper{})
},
})
}
// Test that when switching to balancers, the old balancer calls RemoveSubConn
// in Close.
//
// This test is to make sure this close doesn't cause a deadlock.
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, inlineRemoveSubConnBalancerName))}, nil)
// This service config update will switch balancer from
// "test-inline-remove-subconn-balancer" to "pick_first". The test balancer
// will be closed, which will call cc.RemoveSubConn() inline (this
// RemoveSubConn is not required by the API, but some balancers might do
// it).
//
// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
// deadlock (e.g. trying to grab a mutex while it's already locked).
//
// Do it in a goroutine so this test will fail with a helpful message
// (though the goroutine will still leak).
done := make(chan struct{})
go func() {
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`)}, nil)
close(done)
}()
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for updateResolverState to finish")
case <-done:
}
}
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
scpr := r.CC.ParseServiceConfig(s)
if scpr.Err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
}
return scpr
}

View File

@ -1,211 +0,0 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
)
var (
expectedRequest = "ping"
expectedResponse = "pong"
weirdError = "format verbs: %v%s"
sizeLargeErr = 1024 * 1024
canceled = 0
)
const defaultTestTimeout = 10 * time.Second
type testCodec struct {
}
func (testCodec) Marshal(v interface{}) ([]byte, error) {
return []byte(*(v.(*string))), nil
}
func (testCodec) Unmarshal(data []byte, v interface{}) error {
*(v.(*string)) = string(data)
return nil
}
func (testCodec) String() string {
return "test"
}
type testStreamHandler struct {
port string
t transport.ServerTransport
}
func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
p := &parser{r: s}
for {
pf, req, err := p.recvMsg(math.MaxInt32)
if err == io.EOF {
break
}
if err != nil {
return
}
if pf != compressionNone {
t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
return
}
var v string
codec := testCodec{}
if err := codec.Unmarshal(req, &v); err != nil {
t.Errorf("Failed to unmarshal the received message: %v", err)
return
}
if v == "weird error" {
h.t.WriteStatus(s, status.New(codes.Internal, weirdError))
return
}
if v == "canceled" {
canceled++
h.t.WriteStatus(s, status.New(codes.Internal, ""))
return
}
if v == "port" {
h.t.WriteStatus(s, status.New(codes.Internal, h.port))
return
}
if v != expectedRequest {
h.t.WriteStatus(s, status.New(codes.Internal, strings.Repeat("A", sizeLargeErr)))
return
}
}
// send a response back to end the stream.
data, err := encode(testCodec{}, &expectedResponse)
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
hdr, payload := msgHeader(data, nil)
h.t.Write(s, hdr, payload, &transport.Options{})
h.t.WriteStatus(s, status.New(codes.OK, ""))
}
type server struct {
lis net.Listener
port string
addr string
startedErr chan error // sent nil or an error after server starts
mu sync.Mutex
conns map[transport.ServerTransport]bool
channelzID *channelz.Identifier
}
func newTestServer() *server {
return &server{
startedErr: make(chan error, 1),
channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil),
}
}
// start starts server. Other goroutines should block on s.startedErr for further operations.
func (s *server) start(t *testing.T, port int, maxStreams uint32) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
} else {
s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
}
if err != nil {
s.startedErr <- fmt.Errorf("failed to listen: %v", err)
return
}
s.addr = s.lis.Addr().String()
_, p, err := net.SplitHostPort(s.addr)
if err != nil {
s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
return
}
s.port = p
s.conns = make(map[transport.ServerTransport]bool)
s.startedErr <- nil
for {
conn, err := s.lis.Accept()
if err != nil {
return
}
config := &transport.ServerConfig{
MaxStreams: maxStreams,
ChannelzParentID: s.channelzID,
}
st, err := transport.NewServerTransport(conn, config)
if err != nil {
t.Errorf("failed to create server transport: %v", err)
continue
}
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
st.Close()
return
}
s.conns[st] = true
s.mu.Unlock()
h := &testStreamHandler{
port: s.port,
t: st,
}
go st.HandleStreams(func(s *transport.Stream) {
go h.handleStream(t, s)
}, func(ctx context.Context, method string) context.Context {
return ctx
})
}
}
func (s *server) wait(t *testing.T, timeout time.Duration) {
select {
case err := <-s.startedErr:
if err != nil {
t.Fatal(err)
}
case <-time.After(timeout):
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
}
}
func (s *server) stop() {
s.lis.Close()
s.mu.Lock()
for c := range s.conns {
c.Close()
}
s.conns = nil
s.mu.Unlock()
}

View File

@ -672,14 +672,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
var addrs []resolver.Address
for _, addr := range s.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
i++
addrs = append(addrs, addr)
}
s.Addresses = addrs
}
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {

View File

@ -35,7 +35,10 @@ import (
"google.golang.org/grpc/resolver/manual"
)
const stateRecordingBalancerName = "state_recoding_balancer"
const (
stateRecordingBalancerName = "state_recoding_balancer"
defaultTestTimeout = 10 * time.Second
)
var testBalancerBuilder = newStateRecordingBalancerBuilder()

View File

@ -40,9 +40,18 @@ import (
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/testdata"
)
func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
scpr := r.CC.ParseServiceConfig(s)
if scpr.Err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
}
return scpr
}
func (s) TestDialWithTimeout(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {

View File

@ -0,0 +1,249 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package fakegrpclb provides a fake implementation of the grpclb server.
package fakegrpclb
import (
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
"google.golang.org/grpc"
lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/status"
)
var logger = grpclog.Component("fake_grpclb")
// ServerParams wraps options passed while creating a Server.
type ServerParams struct {
ListenPort int // Listening port for the balancer server.
ServerOptions []grpc.ServerOption // gRPC options for the balancer server.
LoadBalancedServiceName string // Service name being load balanced for.
LoadBalancedServicePort int // Service port being load balanced for.
BackendAddresses []string // Service backends to balance load across.
ShortStream bool // End balancer stream after sending server list.
}
// Server is a fake implementation of the grpclb LoadBalancer service. It does
// not support stats reporting from clients, and always sends back a static list
// of backends to the client to balance load across.
//
// It is safe for concurrent access.
type Server struct {
lbgrpc.UnimplementedLoadBalancerServer
// Options copied over from ServerParams passed to NewServer.
sOpts []grpc.ServerOption // gRPC server options.
serviceName string // Service name being load balanced for.
servicePort int // Service port being load balanced for.
shortStream bool // End balancer stream after sending server list.
// Values initialized using ServerParams passed to NewServer.
backends []*lbpb.Server // Service backends to balance load across.
lis net.Listener // Listener for grpc connections to the LoadBalancer service.
// mu guards access to below fields.
mu sync.Mutex
grpcServer *grpc.Server // Underlying grpc server.
address string // Actual listening address.
stopped chan struct{} // Closed when Stop() is called.
}
// NewServer creates a new Server with passed in params. Returns a non-nil error
// if the params are invalid.
func NewServer(params ServerParams) (*Server, error) {
var servers []*lbpb.Server
for _, addr := range params.BackendAddresses {
ipStr, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("failed to parse list of backend address %q: %v", addr, err)
}
ip := net.ParseIP(ipStr)
if ip == nil {
return nil, fmt.Errorf("failed to parse ip: %q", ipStr)
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("failed to convert port %q to int", portStr)
}
logger.Infof("Adding backend ip: %q, port: %d to server list", ip.String(), port)
servers = append(servers, &lbpb.Server{
IpAddress: ip,
Port: int32(port),
})
}
lis, err := net.Listen("tcp", ":"+strconv.Itoa(params.ListenPort))
if err != nil {
return nil, fmt.Errorf("failed to listen on port %q: %v", params.ListenPort, err)
}
return &Server{
sOpts: params.ServerOptions,
serviceName: params.LoadBalancedServiceName,
servicePort: params.LoadBalancedServicePort,
shortStream: params.ShortStream,
backends: servers,
lis: lis,
address: lis.Addr().String(),
stopped: make(chan struct{}),
}, nil
}
// Serve starts serving the LoadBalancer service on a gRPC server.
//
// It returns early with a non-nil error if it is unable to start serving.
// Otherwise, it blocks until Stop() is called, at which point it returns the
// error returned by the underlying grpc.Server's Serve() method.
func (s *Server) Serve() error {
s.mu.Lock()
if s.grpcServer != nil {
s.mu.Unlock()
return errors.New("Serve() called multiple times")
}
server := grpc.NewServer(s.sOpts...)
s.grpcServer = server
s.mu.Unlock()
logger.Infof("Begin listening on %s", s.lis.Addr().String())
lbgrpc.RegisterLoadBalancerServer(server, s)
return server.Serve(s.lis) // This call will block.
}
// Stop stops serving the LoadBalancer service and unblocks the preceding call
// to Serve().
func (s *Server) Stop() {
defer close(s.stopped)
s.mu.Lock()
if s.grpcServer != nil {
s.grpcServer.Stop()
s.grpcServer = nil
}
s.mu.Unlock()
}
// Address returns the host:port on which the LoadBalancer service is serving.
func (s *Server) Address() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.address
}
// BalanceLoad provides a fake implementation of the LoadBalancer service.
func (s *Server) BalanceLoad(stream lbgrpc.LoadBalancer_BalanceLoadServer) error {
logger.Info("New BalancerLoad stream started")
req, err := stream.Recv()
if err == io.EOF {
logger.Warning("Received EOF when reading from the stream")
return nil
}
if err != nil {
logger.Warning("Failed to read LoadBalanceRequest from stream: %v", err)
return err
}
logger.Infof("Received LoadBalancerRequest:\n%s", pretty.ToJSON(req))
// Initial request contains the service being load balanced for.
initialReq := req.GetInitialRequest()
if initialReq == nil {
logger.Info("First message on the stream does not contain an InitialLoadBalanceRequest")
return status.Error(codes.Unknown, "First request not an InitialLoadBalanceRequest")
}
// Basic validation of the service name and port from the incoming request.
//
// Clients targeting service:port can sometimes include the ":port" suffix in
// their requested names; handle this case.
serviceName, port, err := net.SplitHostPort(initialReq.Name)
if err != nil {
// Requested name did not contain a port. So, use the name as is.
serviceName = initialReq.Name
} else {
p, err := strconv.Atoi(port)
if err != nil {
logger.Info("Failed to parse requested service port %q to integer", port)
return status.Error(codes.Unknown, "Bad requested service port number")
}
if p != s.servicePort {
logger.Info("Requested service port number %q does not match expected", port, s.servicePort)
return status.Error(codes.Unknown, "Bad requested service port number")
}
}
if serviceName != s.serviceName {
logger.Info("Requested service name %q does not match expected %q", serviceName, s.serviceName)
return status.Error(codes.NotFound, "Bad requested service name")
}
// Empty initial response disables stats reporting from the client. Stats
// reporting from the client is used to determine backend load and is not
// required for the purposes of this fake.
initResp := &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
InitialResponse: &lbpb.InitialLoadBalanceResponse{},
},
}
if err := stream.Send(initResp); err != nil {
logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
return err
}
resp := &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: &lbpb.ServerList{Servers: s.backends},
},
}
logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
if err := stream.Send(resp); err != nil {
logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
return err
}
if s.shortStream {
logger.Info("Ending stream early as the short stream option was set")
return nil
}
for {
select {
case <-stream.Context().Done():
return nil
case <-s.stopped:
return nil
case <-time.After(10 * time.Second):
logger.Infof("Sending response with server list: %s", pretty.ToJSON(resp))
if err := stream.Send(resp); err != nil {
logger.Warningf("Failed to send InitialLoadBalanceResponse on the stream: %v", err)
return err
}
}
}
}

View File

@ -23,18 +23,13 @@ package main
import (
"flag"
"net"
"strconv"
"strings"
"time"
"google.golang.org/grpc"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/alts"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
"google.golang.org/grpc/internal/testutils/fakegrpclb"
"google.golang.org/grpc/testdata"
)
@ -49,69 +44,9 @@ var (
logger = grpclog.Component("interop")
)
type loadBalancerServer struct {
lbpb.UnimplementedLoadBalancerServer
serverListResponse *lbpb.LoadBalanceResponse
}
func (l *loadBalancerServer) BalanceLoad(stream lbpb.LoadBalancer_BalanceLoadServer) error {
logger.Info("Begin handling new BalancerLoad request.")
var lbReq *lbpb.LoadBalanceRequest
var err error
if lbReq, err = stream.Recv(); err != nil {
logger.Errorf("Error receiving LoadBalanceRequest: %v", err)
return err
}
logger.Info("LoadBalancerRequest received.")
initialReq := lbReq.GetInitialRequest()
if initialReq == nil {
logger.Info("Expected first request to be an InitialRequest. Got: %v", lbReq)
return status.Error(codes.Unknown, "First request not an InitialRequest")
}
// gRPC clients targeting foo.bar.com:443 can sometimes include the ":443" suffix in
// their requested names; handle this case. TODO: make 443 configurable?
var cleanedName string
var requestedNamePortNumber string
if cleanedName, requestedNamePortNumber, err = net.SplitHostPort(initialReq.Name); err != nil {
cleanedName = initialReq.Name
} else {
if requestedNamePortNumber != "443" {
logger.Info("Bad requested service name port number: %v.", requestedNamePortNumber)
return status.Error(codes.Unknown, "Bad requested service name port number")
}
}
if cleanedName != *serviceName {
logger.Info("Expected requested service name: %v. Got: %v", *serviceName, initialReq.Name)
return status.Error(codes.NotFound, "Bad requested service name")
}
if err := stream.Send(&lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_InitialResponse{
InitialResponse: &lbpb.InitialLoadBalanceResponse{},
},
}); err != nil {
logger.Errorf("Error sending initial LB response: %v", err)
return status.Error(codes.Unknown, "Error sending initial response")
}
logger.Info("Send LoadBalanceResponse: %v", l.serverListResponse)
if err := stream.Send(l.serverListResponse); err != nil {
logger.Errorf("Error sending LB response: %v", err)
return status.Error(codes.Unknown, "Error sending response")
}
if *shortStream {
return nil
}
for {
logger.Info("Send LoadBalanceResponse: %v", l.serverListResponse)
if err := stream.Send(l.serverListResponse); err != nil {
logger.Errorf("Error sending LB response: %v", err)
return status.Error(codes.Unknown, "Error sending response")
}
time.Sleep(10 * time.Second)
}
}
func main() {
flag.Parse()
var opts []grpc.ServerOption
if *useTLS {
certFile := testdata.Path("server1.pem")
@ -126,47 +61,23 @@ func main() {
altsTC := alts.NewServerCreds(altsOpts)
opts = append(opts, grpc.Creds(altsTC))
}
var serverList []*lbpb.Server
if len(*backendAddrs) == 0 {
serverList = make([]*lbpb.Server, 0)
} else {
rawBackendAddrs := strings.Split(*backendAddrs, ",")
serverList = make([]*lbpb.Server, len(rawBackendAddrs))
for i := range rawBackendAddrs {
rawIP, rawPort, err := net.SplitHostPort(rawBackendAddrs[i])
if err != nil {
logger.Fatalf("Failed to parse --backend_addrs[%d]=%v, error: %v", i, rawBackendAddrs[i], err)
}
ip := net.ParseIP(rawIP)
if ip == nil {
logger.Fatalf("Failed to parse ip: %v", rawIP)
}
numericPort, err := strconv.Atoi(rawPort)
if err != nil {
logger.Fatalf("Failed to convert port %v to int", rawPort)
}
logger.Infof("Adding backend ip: %v, port: %d", ip.String(), numericPort)
serverList[i] = &lbpb.Server{
IpAddress: ip,
Port: int32(numericPort),
}
}
}
serverListResponse := &lbpb.LoadBalanceResponse{
LoadBalanceResponseType: &lbpb.LoadBalanceResponse_ServerList{
ServerList: &lbpb.ServerList{
Servers: serverList,
},
},
}
server := grpc.NewServer(opts...)
logger.Infof("Begin listening on %d.", *port)
lis, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
logger.Fatalf("Failed to listen on port %v: %v", *port, err)
}
lbpb.RegisterLoadBalancerServer(server, &loadBalancerServer{
serverListResponse: serverListResponse,
rawBackendAddrs := strings.Split(*backendAddrs, ",")
server, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
ListenPort: *port,
ServerOptions: opts,
LoadBalancedServiceName: *serviceName,
LoadBalancedServicePort: 443, // TODO: make this configurable?
BackendAddresses: rawBackendAddrs,
ShortStream: *shortStream,
})
server.Serve(lis)
if err != nil {
logger.Fatalf("Failed to create balancer server: %v", err)
}
// Serve() starts serving and blocks until Stop() is called. We don't need to
// call Stop() here since we want the server to run until we are killed.
if err := server.Serve(); err != nil {
logger.Fatalf("Failed to start balancer server: %v", err)
}
}

View File

@ -87,7 +87,7 @@ func loadFileDesc(filename string) (*descriptorpb.FileDescriptorProto, []byte) {
func loadFileDescDynamic(b []byte) (*descriptorpb.FileDescriptorProto, protoreflect.FileDescriptor, []byte) {
m := new(descriptorpb.FileDescriptorProto)
if err := proto.Unmarshal(b, m); err != nil {
panic(fmt.Sprintf("failed to unmarshal dynamic proto raw descriptor"))
panic("failed to unmarshal dynamic proto raw descriptor")
}
fd, err := protodesc.NewFile(m, nil)

View File

@ -28,12 +28,20 @@ import (
"time"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
)
type emptyServiceServer interface{}
type testServer struct{}
func errorDesc(err error) string {
if s, ok := status.FromError(err); ok {
return s.Message()
}
return err.Error()
}
func (s) TestStopBeforeServe(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {

View File

@ -0,0 +1,636 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"context"
"fmt"
"strings"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/fakegrpclb"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
testpb "google.golang.org/grpc/test/grpc_testing"
)
const (
loadBalancedServiceName = "foo.bar.service"
loadBalancedServicePort = 443
wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"`
wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"`
wantPickFirstTraceDesc = `Channel switches to new LB policy "pick_first"`
)
// setupBackendsAndFakeGRPCLB sets up the stub server backends and a fake grpclb
// server for tests which exercise balancer switch scenarios involving grpclb.
// Returns a cleanup function to be invoked by the caller.
func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) {
czCleanup := channelz.NewChannelzStorageForTesting()
backends, backendsCleanup := startBackendsForBalancerSwitch(t)
rawAddrs := stubBackendsToRawAddrs(backends)
lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
LoadBalancedServiceName: loadBalancedServiceName,
LoadBalancedServicePort: loadBalancedServicePort,
BackendAddresses: rawAddrs,
})
if err != nil {
t.Fatalf("failed to create fake grpclb server: %v", err)
}
go func() {
if err := lbServer.Serve(); err != nil {
t.Errorf("fake grpclb Serve() failed: %v", err)
}
}()
return backends, lbServer, func() {
backendsCleanup()
lbServer.Stop()
czCleanupWrapper(czCleanup, t)
}
}
// startBackendsForBalancerSwitch spins up a bunch of stub server backends
// exposing the TestService. Returns a cleanup function to be invoked by the
// caller.
func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) {
t.Helper()
const backendCount = 3
backends := make([]*stubserver.StubServer, backendCount)
for i := 0; i < backendCount; i++ {
backend := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
if err := backend.StartServer(); err != nil {
t.Fatalf("Failed to start backend: %v", err)
}
t.Logf("Started TestService backend at: %q", backend.Address)
backends[i] = backend
}
return backends, func() {
for _, b := range backends {
b.Stop()
}
}
}
// stubBackendsToRawAddrs converts from a set of stub server backends to raw
// address strings. Useful when pushing addresses to the fake grpclb server.
func stubBackendsToRawAddrs(backends []*stubserver.StubServer) []string {
addrs := make([]string, len(backends))
for i, backend := range backends {
addrs[i] = backend.Address
}
return addrs
}
// checkForTraceEvent looks for a trace event in the top level channel matching
// the given description. Events before since are ignored. Returns nil error if
// such an event is found.
func checkForTraceEvent(ctx context.Context, wantDesc string, since time.Time) error {
for {
if err := ctx.Err(); err != nil {
return err
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return fmt.Errorf("channelz returned %d top channels, want 1", len(tcs))
}
for _, event := range tcs[0].Trace.Events {
if event.Timestamp.Before(since) {
continue
}
if strings.Contains(event.Desc, wantDesc) {
return nil
}
}
time.Sleep(defaultTestShortTimeout)
}
}
// TestBalancerSwitch_Basic tests the basic scenario of switching from one LB
// policy to another, as specified in the service config.
func (s) TestBalancerSwitch_Basic(t *testing.T) {
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update without an LB policy in the service config. The
// channel should pick the default LB policy, which is pick_first.
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
// Push a resolver update with the service config specifying "round_robin".
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update with the service config specifying "pick_first".
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
})
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_grpclbToPickFirst tests the scenario where the channel
// starts off "grpclb", switches to "pick_first" and back.
func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now := time.Now()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
now = time.Now()
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer)
if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
}
// TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel
// starts off with "pick_first", switches to "grpclb" and back.
func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now = time.Now()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
now = time.Now()
const nonExistentServer = "non-existent-grpclb-server-address"
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer)
if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
// Switch to "pick_first" again by sending no grpclb server addresses.
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_RoundRobinToGRPCLB tests the scenario where the channel
// starts off with "round_robin", switches to "grpclb" and back.
//
// Note that this test uses the deprecated `loadBalancingPolicy` field in the
// service config.
func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Note the use of the deprecated `loadBalancingPolicy` field here instead
// of the now recommended `loadBalancingConfig` field. The logic in the
// ClientConn which decides which balancer to switch to looks at the
// following places in the given order of preference:
// - `loadBalancingConfig` field
// - addresses of type grpclb
// - `loadBalancingPolicy` field
// If we use the `loadBalancingPolicy` field, the switch to "grpclb" later on
// in the test will not happen as the ClientConn will continue to use the LB
// policy received in the first update.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
// Push a resolver update with the service config specifying "round_robin".
now := time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: scpr,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now = time.Now()
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}},
ServiceConfig: scpr,
})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Switch back to "round_robin".
now = time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: scpr,
})
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_grpclbNotRegistered tests the scenario where the grpclb
// balancer is not registered. Verifies that the ClientConn fallbacks to the
// default LB policy or the LB policy specified in the service config, and that
// addresses of type "grpclb" are filtered out.
func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
// Unregister the grpclb balancer builder for the duration of this test.
grpclbBuilder := balancer.Get("grpclb")
internal.BalancerUnregister(grpclbBuilder.Name())
defer balancer.Register(grpclbBuilder)
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update which contains a bunch of stub server backends and a
// grpclb server address. The latter should get the ClientConn to try and
// apply the grpclb policy. But since grpclb is not registered, it should
// fallback to the default LB policy which is pick_first. The ClientConn is
// also expected to filter out the grpclb address when sending the addresses
// list fo pick_first.
grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}}
addrs = append(grpclbAddr, addrs...)
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
// Push a resolver update with the same addresses, but with a service config
// specifying "round_robin". The ClientConn is expected to filter out the
// grpclb address when sending the addresses list to round_robin.
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy verifies that
// if the resolver update contains any addresses of type "grpclb", it overrides
// the LB policy specifies in the deprecated `loadBalancingPolicy` field of the
// service config.
func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update containing no grpclb server address. This should
// lead to the channel using the default LB policy which is pick_first.
now := time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err)
}
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config. The addresses list contains
// the stub backend addresses and a single address pointing to the grpclb
// server we created above. This will cause the channel to switch to the
// "grpclb" balancer, and will equally distribute RPCs across the backends.
now = time.Now()
r.UpdateState(resolver.State{
Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
})
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update with a service config using the deprecated
// `loadBalancingPolicy` field pointing to round_robin. The addresses list
// contains an address of type "grpclb". This should be preferred and hence
// there should be no balancer switch.
scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
now = time.Now()
r.UpdateState(resolver.State{
Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}),
ServiceConfig: scpr,
})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Switch to "round_robin" by removing the address of type "grpclb".
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
}
// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the
// `loadBalancingConfig` field in the service config trumps over addresses of
// type "grpclb" when it comes to deciding which LB policy is applied on the
// channel.
func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) {
backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)
r := manual.NewBuilderWithScheme("whatever")
target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update with no service config and a single address pointing
// to the grpclb server we created above. This will cause the channel to
// switch to the "grpclb" balancer, and will equally distribute RPCs across
// the backends as the fake grpclb server does not support load reporting from
// the clients.
now := time.Now()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
// Push a resolver update with the service config specifying "round_robin"
// through the recommended `loadBalancingConfig` field.
now = time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
// Push a resolver update with no service config and an address of type
// "grpclb". The ClientConn should continue to use the service config received
// earlier, which specified the use of "round_robin" through the
// `loadBalancingConfig` field, and therefore the balancer should not be
// switched. And because the `loadBalancingConfig` field trumps everything
// else, the address of type "grpclb" should be ignored.
grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}
now = time.Now()
r.UpdateState(resolver.State{Addresses: append(addrs, grpclbAddr)})
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil {
t.Fatal("channel switched balancers when expected not to")
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
}
// TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose tests the scenario
// where the balancer being switched out calls RemoveSubConn() in its Close()
// method. Verifies that this sequence of calls doesn't lead to a deadlock.
func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
// Register a stub balancer which calls RemoveSubConn() from its Close().
scChan := make(chan balancer.SubConn, 1)
uccsCalled := make(chan struct{}, 1)
stub.Register(t.Name(), stub.BalancerFuncs{
UpdateClientConnState: func(data *stub.BalancerData, ccs balancer.ClientConnState) error {
sc, err := data.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
t.Errorf("failed to create subConn: %v", err)
}
scChan <- sc
close(uccsCalled)
return nil
},
Close: func(data *stub.BalancerData) {
data.ClientConn.RemoveSubConn(<-scChan)
},
})
r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
// Push a resolver update specifying our stub balancer as the LB policy.
scpr := parseServiceConfig(t, r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, t.Name()))
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: "dummy-address"}},
ServiceConfig: scpr,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for UpdateClientConnState to be called: %v", ctx.Err())
case <-uccsCalled:
}
// The following service config update will switch balancer from our stub
// balancer to pick_first. The former will be closed, which will call
// cc.RemoveSubConn() inline (this RemoveSubConn is not required by the API,
// but some balancers might do it).
//
// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
// deadlock (e.g. trying to grab a mutex while it's already locked).
//
// Do it in a goroutine so this test will fail with a helpful message
// (though the goroutine will still leak).
done := make(chan struct{})
go func() {
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: "dummy-address"}},
ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
})
close(done)
}()
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for resolver.UpdateState to finish: %v", ctx.Err())
case <-done:
}
}

View File

@ -128,9 +128,9 @@ func checkPickFirst(ctx context.Context, cc *grpc.ClientConn, wantAddr string) e
return nil
}
// backendsToAddrs is a helper routine to convert from a set of backends to
// stubBackendsToResolverAddrs converts from a set of stub server backends to
// resolver addresses. Useful when pushing addresses to the manual resolver.
func backendsToAddrs(backends []*stubserver.StubServer) []resolver.Address {
func stubBackendsToResolverAddrs(backends []*stubserver.StubServer) []resolver.Address {
addrs := make([]resolver.Address, len(backends))
for i, backend := range backends {
addrs[i] = resolver.Address{Addr: backend.Address}
@ -143,7 +143,7 @@ func backendsToAddrs(backends []*stubserver.StubServer) []resolver.Address {
func (s) TestPickFirst_OneBackend(t *testing.T) {
cc, r, backends := setupPickFirst(t, 1)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -158,7 +158,7 @@ func (s) TestPickFirst_OneBackend(t *testing.T) {
func (s) TestPickFirst_MultipleBackends(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -174,7 +174,7 @@ func (s) TestPickFirst_MultipleBackends(t *testing.T) {
func (s) TestPickFirst_OneServerDown(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -197,7 +197,7 @@ func (s) TestPickFirst_OneServerDown(t *testing.T) {
func (s) TestPickFirst_AllServersDown(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -228,7 +228,7 @@ func (s) TestPickFirst_AllServersDown(t *testing.T) {
func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
cc, r, backends := setupPickFirst(t, 3)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -273,7 +273,7 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
// backends are added, the RPC is able to complete.
func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
cc, r, backends := setupPickFirst(t, 2)
addrs := backendsToAddrs(backends)
addrs := stubBackendsToResolverAddrs(backends)
r.UpdateState(resolver.State{Addresses: addrs})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)

View File

@ -129,7 +129,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) {
// The wrappingBalancer wraps a pick_first balancer and writes to a channel when
// it receives a ClientConn update. This is different to a stub balancer which
// only notifies of updates from grpc, but does not contain a real balanacer.
// only notifies of updates from grpc, but does not contain a real balancer.
//
// The wrappingBalancer allows us to write tests with a real backend and make
// real RPCs.

View File

@ -43,7 +43,8 @@ import (
const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error {
client := testgrpc.NewTestServiceClient(cc)
var peer peer.Peer
// Make sure connections to all backends are up.
backendCount := len(addrs)
@ -126,7 +127,7 @@ func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOpt
}
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkRoundRobin(ctx, client, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
return cc, r, backends
@ -244,8 +245,7 @@ func (s) TestRoundRobin_OneServerDown(t *testing.T) {
for i := 0; i < len(backends)-1; i++ {
addrs[i] = resolver.Address{Addr: backends[i].Address}
}
client := testpb.NewTestServiceClient(cc)
if err := checkRoundRobin(ctx, client, addrs); err != nil {
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatalf("RPCs are not being round robined across remaining servers: %v", err)
}
}