grpclb: filter out grpclb addresses if balancer in use is not grpclb (#2509)

This commit is contained in:
Menghan Li 2018-12-13 10:54:42 -08:00 committed by GitHub
parent ebf41aabc9
commit 191cc8e37b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 21 deletions

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
)
@ -47,6 +48,18 @@ func Register(b Builder) {
m[strings.ToLower(b.Name())] = b
}
// unregisterForTesting deletes the balancer with the given name from the
// balancer map.
//
// This function is not thread-safe.
func unregisterForTesting(name string) {
delete(m, name)
}
func init() {
internal.BalancerUnregister = unregisterForTesting
}
// Get returns the resolver builder registered with the given name.
// Note that the compare is done in a case-insenstive fashion.
// If no builder is register with the name, nil will be returned.

View File

@ -342,6 +342,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
var remoteBalancerAddrs, backendAddrs []resolver.Address
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
a.Type = resolver.Backend
remoteBalancerAddrs = append(remoteBalancerAddrs, a)
} else {
backendAddrs = append(backendAddrs, a)

View File

@ -628,25 +628,10 @@ func TestBalancerDisconnects(t *testing.T) {
t.Fatalf("No RPC sent to second backend after 1 second")
}
type customGRPCLBBuilder struct {
balancer.Builder
name string
}
func (b *customGRPCLBBuilder) Name() string {
return b.name
}
const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout"
func init() {
balancer.Register(&customGRPCLBBuilder{
Builder: newLBBuilderWithFallbackTimeout(100 * time.Millisecond),
name: grpclbCustomFallbackName,
})
}
func TestFallback(t *testing.T) {
balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
defer balancer.Register(newLBBuilder())
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
@ -684,7 +669,6 @@ func TestFallback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
grpc.WithBalancerName(grpclbCustomFallbackName),
grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)

View File

@ -178,6 +178,28 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
}
func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
if ccb.cc.curBalancerName != grpclbName {
var containsGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
containsGRPCLB = true
break
}
}
if containsGRPCLB {
// The current balancer is not grpclb, but addresses contain grpclb
// address. This means we failed to switch to grpclb, most likely
// because grpclb is not registered. Filter out all grpclb addresses
// from addrs before sending to balancer.
tempAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
if a.Type != resolver.GRPCLB {
tempAddrs = append(tempAddrs, a)
}
}
addrs = tempAddrs
}
}
select {
case <-ccb.resolverUpdateCh:
default:

View File

@ -29,6 +29,7 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
@ -133,7 +134,7 @@ func TestSwitchBalancer(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
const numServers = 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
@ -165,7 +166,7 @@ func TestBalancerDialOption(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
numServers := 2
const numServers = 2
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
@ -467,3 +468,52 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName)
}
}
// Test that when switching to grpclb fails because grpclb is not registered,
// the fallback balancer will only get backend addresses, not the grpclb server
// address.
//
// The tests sends 3 server addresses (all backends) as resolved addresses, but
// claim the first one is grpclb server. The all RPCs should all be send to the
// other addresses, not the first one.
func TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
internal.BalancerUnregister("grpclb")
defer balancer.Register(&magicalLB{})
defer leakcheck.Check(t)
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()
const numServers = 3
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}})
// The default balancer is pickfirst.
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Try switching to grpclb by sending servers[0] as grpclb address. It's
// expected that servers[0] will be filtered out, so it will not be used by
// the balancer.
//
// If the filtering failed, servers[0] will be used for RPCs and the RPCs
// will succeed. The following checks will catch this and fail.
r.NewAddress([]resolver.Address{
{Addr: servers[0].addr, Type: resolver.GRPCLB},
{Addr: servers[1].addr}, {Addr: servers[2].addr}})
// Still check for pickfirst, but only with server[1] and server[2].
if err := checkPickFirst(cc, servers[1:]); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, anc check against server[1] and server[2].
cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

View File

@ -29,6 +29,8 @@ var (
WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption
// HealthCheckFunc is used to provide client-side LB channel health checking
HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error
// BalancerUnregister is exported by package balancer to unregister a balancer.
BalancerUnregister func(name string)
)
const (