From 191cc8e37b04f481b133619e3b9e77a7c5655aee Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 13 Dec 2018 10:54:42 -0800 Subject: [PATCH] grpclb: filter out grpclb addresses if balancer in use is not grpclb (#2509) --- balancer/balancer.go | 13 ++++++++ balancer/grpclb/grpclb.go | 1 + balancer/grpclb/grpclb_test.go | 22 ++------------ balancer_conn_wrappers.go | 22 ++++++++++++++ balancer_switching_test.go | 54 ++++++++++++++++++++++++++++++++-- internal/internal.go | 2 ++ 6 files changed, 93 insertions(+), 21 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 1bf46aafe..317c2e728 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" ) @@ -47,6 +48,18 @@ func Register(b Builder) { m[strings.ToLower(b.Name())] = b } +// unregisterForTesting deletes the balancer with the given name from the +// balancer map. +// +// This function is not thread-safe. +func unregisterForTesting(name string) { + delete(m, name) +} + +func init() { + internal.BalancerUnregister = unregisterForTesting +} + // Get returns the resolver builder registered with the given name. // Note that the compare is done in a case-insenstive fashion. // If no builder is register with the name, nil will be returned. diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 789b9c433..735c18410 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -342,6 +342,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { var remoteBalancerAddrs, backendAddrs []resolver.Address for _, a := range addrs { if a.Type == resolver.GRPCLB { + a.Type = resolver.Backend remoteBalancerAddrs = append(remoteBalancerAddrs, a) } else { backendAddrs = append(backendAddrs, a) diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 7ad0f5568..39285cd5a 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -628,25 +628,10 @@ func TestBalancerDisconnects(t *testing.T) { t.Fatalf("No RPC sent to second backend after 1 second") } -type customGRPCLBBuilder struct { - balancer.Builder - name string -} - -func (b *customGRPCLBBuilder) Name() string { - return b.name -} - -const grpclbCustomFallbackName = "grpclb_with_custom_fallback_timeout" - -func init() { - balancer.Register(&customGRPCLBBuilder{ - Builder: newLBBuilderWithFallbackTimeout(100 * time.Millisecond), - name: grpclbCustomFallbackName, - }) -} - func TestFallback(t *testing.T) { + balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond)) + defer balancer.Register(newLBBuilder()) + defer leakcheck.Check(t) r, cleanup := manual.GenerateAndRegisterManualResolver() @@ -684,7 +669,6 @@ func TestFallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerName(grpclbCustomFallbackName), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 77b684775..7233ade29 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -178,6 +178,28 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co } func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) { + if ccb.cc.curBalancerName != grpclbName { + var containsGRPCLB bool + for _, a := range addrs { + if a.Type == resolver.GRPCLB { + containsGRPCLB = true + break + } + } + if containsGRPCLB { + // The current balancer is not grpclb, but addresses contain grpclb + // address. This means we failed to switch to grpclb, most likely + // because grpclb is not registered. Filter out all grpclb addresses + // from addrs before sending to balancer. + tempAddrs := make([]resolver.Address, 0, len(addrs)) + for _, a := range addrs { + if a.Type != resolver.GRPCLB { + tempAddrs = append(tempAddrs, a) + } + } + addrs = tempAddrs + } + } select { case <-ccb.resolverUpdateCh: default: diff --git a/balancer_switching_test.go b/balancer_switching_test.go index d36c8e96e..e017a291d 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" _ "google.golang.org/grpc/grpclog/glogger" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -133,7 +134,7 @@ func TestSwitchBalancer(t *testing.T) { r, rcleanup := manual.GenerateAndRegisterManualResolver() defer rcleanup() - numServers := 2 + const numServers = 2 servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() @@ -165,7 +166,7 @@ func TestBalancerDialOption(t *testing.T) { r, rcleanup := manual.GenerateAndRegisterManualResolver() defer rcleanup() - numServers := 2 + const numServers = 2 servers, _, scleanup := startServers(t, numServers, math.MaxInt32) defer scleanup() @@ -467,3 +468,52 @@ func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) } } + +// Test that when switching to grpclb fails because grpclb is not registered, +// the fallback balancer will only get backend addresses, not the grpclb server +// address. +// +// The tests sends 3 server addresses (all backends) as resolved addresses, but +// claim the first one is grpclb server. The all RPCs should all be send to the +// other addresses, not the first one. +func TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) { + internal.BalancerUnregister("grpclb") + defer balancer.Register(&magicalLB{}) + + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + const numServers = 3 + servers, _, scleanup := startServers(t, numServers, math.MaxInt32) + defer scleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + r.NewAddress([]resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}) + // The default balancer is pickfirst. + if err := checkPickFirst(cc, servers[1:]); err != nil { + t.Fatalf("check pickfirst returned non-nil error: %v", err) + } + // Try switching to grpclb by sending servers[0] as grpclb address. It's + // expected that servers[0] will be filtered out, so it will not be used by + // the balancer. + // + // If the filtering failed, servers[0] will be used for RPCs and the RPCs + // will succeed. The following checks will catch this and fail. + r.NewAddress([]resolver.Address{ + {Addr: servers[0].addr, Type: resolver.GRPCLB}, + {Addr: servers[1].addr}, {Addr: servers[2].addr}}) + // Still check for pickfirst, but only with server[1] and server[2]. + if err := checkPickFirst(cc, servers[1:]); err != nil { + t.Fatalf("check pickfirst returned non-nil error: %v", err) + } + // Switch to roundrobin, anc check against server[1] and server[2]. + cc.handleServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + if err := checkRoundRobin(cc, servers[1:]); err != nil { + t.Fatalf("check roundrobin returned non-nil error: %v", err) + } +} diff --git a/internal/internal.go b/internal/internal.go index f8932b1d8..466f5d244 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -29,6 +29,8 @@ var ( WithResolverBuilder interface{} // func (resolver.Builder) grpc.DialOption // HealthCheckFunc is used to provide client-side LB channel health checking HealthCheckFunc func(ctx context.Context, newStream func() (interface{}, error), reportHealth func(bool), serviceName string) error + // BalancerUnregister is exported by package balancer to unregister a balancer. + BalancerUnregister func(name string) ) const (