diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 193873b0f..59f38f904 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -28,10 +28,9 @@ import ( "sync" "time" - durationpb "github.com/golang/protobuf/ptypes/duration" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" @@ -39,6 +38,9 @@ import ( "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/resolver/dns" "google.golang.org/grpc/resolver" + + durationpb "github.com/golang/protobuf/ptypes/duration" + lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" ) const ( @@ -410,11 +412,6 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error lb.handleServiceConfig(gc) addrs := ccs.ResolverState.Addresses - if len(addrs) == 0 { - // There should be at least one address, either grpclb server or - // fallback. Empty address is not valid. - return balancer.ErrBadResolverState - } var remoteBalancerAddrs, backendAddrs []resolver.Address for _, a := range addrs { @@ -425,6 +422,17 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error backendAddrs = append(backendAddrs, a) } } + if sd := grpclbstate.Get(ccs.ResolverState); sd != nil { + // Override any balancer addresses provided via + // ccs.ResolverState.Addresses. + remoteBalancerAddrs = sd.BalancerAddresses + } + + if len(backendAddrs)+len(remoteBalancerAddrs) == 0 { + // There should be at least one address, either grpclb server or + // fallback. Empty address is not valid. + return balancer.ErrBadResolverState + } if len(remoteBalancerAddrs) == 0 { if lb.ccRemoteLB != nil { diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 19296ca3a..d701b6d21 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -31,11 +31,9 @@ import ( "testing" "time" - durationpb "github.com/golang/protobuf/ptypes/duration" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" - lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/grpctest" @@ -44,6 +42,10 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/status" + + durationpb "github.com/golang/protobuf/ptypes/duration" + lbgrpc "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -390,6 +392,8 @@ func newLoadBalancer(numberOfBackends int, statsChan chan *lbpb.ClientStats) (ts return } +var grpclbConfig = `{"loadBalancingConfig": [{"grpclb": {}}]}` + func (s) TestGRPCLB(t *testing.T) { r, cleanup := manual.GenerateAndRegisterManualResolver() defer cleanup() @@ -422,13 +426,17 @@ func (s) TestGRPCLB(t *testing.T) { defer cc.Close() testC := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{Addresses: []resolver.Address{{ - Addr: tss.lbAddr, - Type: resolver.GRPCLB, - ServerName: lbServerName, - }}}) + rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(grpclbConfig)}, + &grpclbstate.State{BalancerAddresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.Backend, + ServerName: lbServerName, + }}}) + r.UpdateState(rs) - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } } diff --git a/balancer/grpclb/state/state.go b/balancer/grpclb/state/state.go new file mode 100644 index 000000000..a24264a34 --- /dev/null +++ b/balancer/grpclb/state/state.go @@ -0,0 +1,51 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package state declares grpclb types to be set by resolvers wishing to pass +// information to grpclb via resolver.State Attributes. +package state + +import ( + "google.golang.org/grpc/resolver" +) + +// keyType is the key to use for storing State in Attributes. +type keyType string + +const key = keyType("grpc.grpclb.state") + +// State contains gRPCLB-relevant data passed from the name resolver. +type State struct { + // BalancerAddresses contains the remote load balancer address(es). If + // set, overrides any resolver-provided addresses with Type of GRPCLB. + BalancerAddresses []resolver.Address +} + +// Set returns a copy of the provided state with attributes containing s. s's +// data should not be mutated after calling Set. +func Set(state resolver.State, s *State) resolver.State { + state.Attributes = state.Attributes.WithValues(key, s) + return state +} + +// Get returns the grpclb State in the resolver.State, or nil if not present. +// The returned data should not be mutated. +func Get(state resolver.State) *State { + s, _ := state.Attributes.Value(key).(*State) + return s +} diff --git a/internal/resolver/dns/dns_resolver.go b/internal/resolver/dns/dns_resolver.go index c368db62e..9d08dd8ab 100644 --- a/internal/resolver/dns/dns_resolver.go +++ b/internal/resolver/dns/dns_resolver.go @@ -32,6 +32,7 @@ import ( "sync" "time" + grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcrand" @@ -251,7 +252,7 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) { return nil, fmt.Errorf("dns: error parsing A record IP address %v", a) } addr := ip + ":" + strconv.Itoa(int(s.Port)) - newAddrs = append(newAddrs, resolver.Address{Addr: addr, Type: resolver.GRPCLB, ServerName: s.Target}) + newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target}) } } return newAddrs, nil @@ -326,13 +327,15 @@ func (d *dnsResolver) lookup() (*resolver.State, error) { if hostErr != nil && (srvErr != nil || len(srv) == 0) { return nil, hostErr } - state := &resolver.State{ - Addresses: append(addrs, srv...), + + state := resolver.State{Addresses: addrs} + if len(srv) > 0 { + state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv}) } if !d.disableServiceConfig { state.ServiceConfig = d.lookupTXT() } - return state, nil + return &state, nil } // formatIP returns ok = false if addr is not a valid textual representation of an IP address. diff --git a/internal/resolver/dns/dns_resolver_test.go b/internal/resolver/dns/dns_resolver_test.go index b7b39a7f6..1c8469a27 100644 --- a/internal/resolver/dns/dns_resolver_test.go +++ b/internal/resolver/dns/dns_resolver_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/resolver" @@ -725,11 +726,11 @@ func testDNSResolver(t *testing.T) { t.Fatalf("UpdateState not called after 2s; aborting") } if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant) } sc := scFromState(state) if a.scWant != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) + t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant) } r.Close() } @@ -742,45 +743,52 @@ func testDNSResolverWithSRV(t *testing.T) { }() defer leakcheck.Check(t) tests := []struct { - target string - addrWant []resolver.Address - scWant string + target string + addrWant []resolver.Address + grpclbAddrs []resolver.Address + scWant string }{ { "foo.bar.com", []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}}, + nil, generateSC("foo.bar.com"), }, { "foo.bar.com:1234", []resolver.Address{{Addr: "1.2.3.4:1234"}, {Addr: "5.6.7.8:1234"}}, + nil, generateSC("foo.bar.com"), }, { "srv.ipv4.single.fake", - []resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}, {Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}}, + []resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}}, + []resolver.Address{{Addr: "1.2.3.4:1234", ServerName: "ipv4.single.fake"}}, generateSC("srv.ipv4.single.fake"), }, { "srv.ipv4.multi.fake", + nil, []resolver.Address{ - {Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"}, - {Addr: "5.6.7.8:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"}, - {Addr: "9.10.11.12:1234", Type: resolver.GRPCLB, ServerName: "ipv4.multi.fake"}, + {Addr: "1.2.3.4:1234", ServerName: "ipv4.multi.fake"}, + {Addr: "5.6.7.8:1234", ServerName: "ipv4.multi.fake"}, + {Addr: "9.10.11.12:1234", ServerName: "ipv4.multi.fake"}, }, generateSC("srv.ipv4.multi.fake"), }, { "srv.ipv6.single.fake", - []resolver.Address{{Addr: "[2607:f8b0:400a:801::1001]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.single.fake"}}, + nil, + []resolver.Address{{Addr: "[2607:f8b0:400a:801::1001]:1234", ServerName: "ipv6.single.fake"}}, generateSC("srv.ipv6.single.fake"), }, { "srv.ipv6.multi.fake", + nil, []resolver.Address{ - {Addr: "[2607:f8b0:400a:801::1001]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"}, - {Addr: "[2607:f8b0:400a:801::1002]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"}, - {Addr: "[2607:f8b0:400a:801::1003]:1234", Type: resolver.GRPCLB, ServerName: "ipv6.multi.fake"}, + {Addr: "[2607:f8b0:400a:801::1001]:1234", ServerName: "ipv6.multi.fake"}, + {Addr: "[2607:f8b0:400a:801::1002]:1234", ServerName: "ipv6.multi.fake"}, + {Addr: "[2607:f8b0:400a:801::1003]:1234", ServerName: "ipv6.multi.fake"}, }, generateSC("srv.ipv6.multi.fake"), }, @@ -807,11 +815,16 @@ func testDNSResolverWithSRV(t *testing.T) { t.Fatalf("UpdateState not called after 2s; aborting") } if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant) + } + gs := grpclbstate.Get(state) + if (gs == nil && len(a.grpclbAddrs) > 0) || + (gs != nil && !reflect.DeepEqual(a.grpclbAddrs, gs.BalancerAddresses)) { + t.Errorf("Resolved state of target: %q = %+v (State=%+v), want state.Attributes.State=%+v", a.target, state, gs, a.grpclbAddrs) } sc := scFromState(state) if a.scWant != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) + t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant) } } } @@ -879,11 +892,11 @@ func testDNSResolveNow(t *testing.T) { t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state) } if !reflect.DeepEqual(a.addrWant, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrWant) } sc := scFromState(state) if a.scWant != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) + t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant) } revertTbl := mutateTbl(a.target) @@ -900,10 +913,10 @@ func testDNSResolveNow(t *testing.T) { } sc = scFromState(state) if !reflect.DeepEqual(a.addrNext, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrNext) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", a.target, state.Addresses, a.addrNext) } if a.scNext != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scNext) + t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scNext) } revertTbl() } @@ -946,7 +959,7 @@ func testIPResolver(t *testing.T) { time.Sleep(time.Millisecond) } if !reflect.DeepEqual(v.want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, state.Addresses, v.want) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", v.target, state.Addresses, v.want) } r.ResolveNow(resolver.ResolveNowOptions{}) for i := 0; i < 50; i++ { @@ -1039,7 +1052,7 @@ func TestDisableServiceConfig(t *testing.T) { } sc := scFromState(state) if a.scWant != sc { - t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant) + t.Errorf("Resolved service config of target: %q = %+v, want %+v", a.target, sc, a.scWant) } } } @@ -1098,7 +1111,7 @@ func TestDNSResolverRetry(t *testing.T) { } want := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}} if !reflect.DeepEqual(want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, want) } // mutate the host lookup table so the target has 0 address returned. revertTbl := mutateTbl(target) @@ -1125,7 +1138,7 @@ func TestDNSResolverRetry(t *testing.T) { time.Sleep(time.Millisecond) } if !reflect.DeepEqual(want, state.Addresses) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, want) } } @@ -1330,7 +1343,7 @@ func TestRateLimitedResolve(t *testing.T) { time.Sleep(time.Millisecond) } if !reflect.DeepEqual(state.Addresses, wantAddrs) { - t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, wantAddrs) + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", target, state.Addresses, wantAddrs) } } diff --git a/resolver/resolver.go b/resolver/resolver.go index fe14b2fb9..379275a2d 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -85,7 +85,10 @@ const ( Backend AddressType = iota // GRPCLB indicates the address is for a grpclb load balancer. // - // Deprecated: use Attributes in Address instead. + // Deprecated: to select the GRPCLB load balancing policy, use a service + // config with a corresponding loadBalancingConfig. To supply balancer + // addresses to the GRPCLB load balancing policy, set State.Attributes + // using balancer/grpclb/state.Set. GRPCLB )