balancer/resolver: add loadBalancingConfig and pre-parsing support (#2732)

This commit is contained in:
Doug Fawley 2019-05-30 09:12:58 -07:00 committed by GitHub
parent 0435a4bb26
commit d40a995895
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 515 additions and 522 deletions

View File

@ -22,6 +22,7 @@ package balancer
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"net" "net"
"strings" "strings"
@ -31,6 +32,7 @@ import (
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
) )
var ( var (
@ -39,7 +41,10 @@ var (
) )
// Register registers the balancer builder to the balancer map. b.Name // Register registers the balancer builder to the balancer map. b.Name
// (lowercased) will be used as the name registered with this builder. // (lowercased) will be used as the name registered with this builder. If the
// Builder implements ConfigParser, ParseConfig will be called when new service
// configs are received by the resolver, and the result will be provided to the
// Balancer in UpdateClientConnState.
// //
// NOTE: this function must only be called during initialization time (i.e. in // NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are // an init() function), and is not thread-safe. If multiple Balancers are
@ -172,6 +177,14 @@ type Builder interface {
Name() string Name() string
} }
// ConfigParser parses load balancer configs.
type ConfigParser interface {
// ParseConfig parses the JSON load balancer config provided into an
// internal form or returns an error if the config is invalid. For future
// compatibility reasons, unknown fields in the config should be ignored.
ParseConfig(LoadBalancingConfigJSON json.RawMessage) (serviceconfig.LoadBalancingConfig, error)
}
// PickOptions contains addition information for the Pick operation. // PickOptions contains addition information for the Pick operation.
type PickOptions struct { type PickOptions struct {
// FullMethodName is the method name that NewClientStream() is called // FullMethodName is the method name that NewClientStream() is called
@ -270,7 +283,7 @@ type Balancer interface {
// non-nil error to gRPC. // non-nil error to gRPC.
// //
// Deprecated: if V2Balancer is implemented by the Balancer, // Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateResolverState will be called instead. // UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error) HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call // Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns. // ClientConn.RemoveSubConn for its existing SubConns.
@ -283,14 +296,23 @@ type SubConnState struct {
// TODO: add last connection error // TODO: add last connection error
} }
// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}
// V2Balancer is defined for documentation purposes. If a Balancer also // V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateResolverState method will be called instead // implements V2Balancer, its UpdateClientConnState method will be called
// of HandleResolvedAddrs and its UpdateSubConnState will be called instead of // instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// HandleSubConnStateChange. // instead of HandleSubConnStateChange.
type V2Balancer interface { type V2Balancer interface {
// UpdateResolverState is called by gRPC when the state of the resolver // UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. // changes.
UpdateResolverState(resolver.State) UpdateClientConnState(ClientConnState)
// UpdateSubConnState is called by gRPC when the state of a SubConn // UpdateSubConnState is called by gRPC when the state of a SubConn
// changes. // changes.
UpdateSubConnState(SubConn, SubConnState) UpdateSubConnState(SubConn, SubConnState)

View File

@ -70,13 +70,13 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented") panic("not implemented")
} }
func (b *baseBalancer) UpdateResolverState(s resolver.State) { func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// TODO: handle s.Err (log if not nil) once implemented. // TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ServiceConfig? // TODO: handle s.ResolverState.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new resolver state: ", s) grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address. // addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{}) addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.Addresses { for _, a := range s.ResolverState.Addresses {
addrsSet[a] = struct{}{} addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok { if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns). // a is a new address (not existing in b.subConns).

View File

@ -408,11 +408,11 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not used") panic("not used")
} }
func (lb *lbBalancer) handleServiceConfig(sc string) { func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.mu.Lock() lb.mu.Lock()
defer lb.mu.Unlock() defer lb.mu.Unlock()
newUsePickFirst := childIsPickFirst(sc) newUsePickFirst := childIsPickFirst(gc)
if lb.usePickFirst == newUsePickFirst { if lb.usePickFirst == newUsePickFirst {
return return
} }
@ -422,13 +422,14 @@ func (lb *lbBalancer) handleServiceConfig(sc string) {
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst) lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
} }
func (lb *lbBalancer) UpdateResolverState(rs resolver.State) { func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
if grpclog.V(2) { if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateResolverState: %+v", rs) grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
} }
lb.handleServiceConfig(rs.ServiceConfig) gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)
addrs := rs.Addresses addrs := ccs.ResolverState.Addresses
if len(addrs) <= 0 { if len(addrs) <= 0 {
return return
} }

View File

@ -23,53 +23,32 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/serviceconfig"
) )
type serviceConfig struct {
LoadBalancingConfig *[]map[string]*grpclbServiceConfig
}
type grpclbServiceConfig struct {
ChildPolicy *[]map[string]json.RawMessage
}
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}
func parseServiceConfig(s string) *grpclbServiceConfig {
parsedSC := parseFullServiceConfig(s)
if parsedSC == nil {
return nil
}
lbConfigs := parsedSC.LoadBalancingConfig
if lbConfigs == nil {
return nil
}
for _, lbC := range *lbConfigs {
if v, ok := lbC[grpclbName]; ok {
return v
}
}
return nil
}
const ( const (
roundRobinName = roundrobin.Name roundRobinName = roundrobin.Name
pickFirstName = grpc.PickFirstBalancerName pickFirstName = grpc.PickFirstBalancerName
) )
func childIsPickFirst(s string) bool { type grpclbServiceConfig struct {
parsedSC := parseServiceConfig(s) serviceconfig.LoadBalancingConfig
if parsedSC == nil { ChildPolicy *[]map[string]json.RawMessage
}
func (b *lbBuilder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
ret := &grpclbServiceConfig{}
if err := json.Unmarshal(lbConfig, ret); err != nil {
return nil, err
}
return ret, nil
}
func childIsPickFirst(sc *grpclbServiceConfig) bool {
if sc == nil {
return false return false
} }
childConfigs := parsedSC.ChildPolicy childConfigs := sc.ChildPolicy
if childConfigs == nil { if childConfigs == nil {
return false return false
} }

View File

@ -20,72 +20,31 @@ package grpclb
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"google.golang.org/grpc/serviceconfig"
) )
func Test_parseFullServiceConfig(t *testing.T) { func Test_Parse(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
s string s string
want *serviceConfig want serviceconfig.LoadBalancingConfig
wantErr error
}{ }{
{ {
name: "empty", name: "empty",
s: "", s: "",
want: nil, want: nil,
wantErr: errors.New("unexpected end of JSON input"),
}, },
{ {
name: "success1", name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`, s: `{"childPolicy":[{"pick_first":{}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")},
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want)
}
})
}
}
func Test_parseServiceConfig(t *testing.T) {
tests := []struct {
name string
s string
want *grpclbServiceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &grpclbServiceConfig{ want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{ ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")}, {"pick_first": json.RawMessage("{}")},
@ -94,7 +53,7 @@ func Test_parseServiceConfig(t *testing.T) {
}, },
{ {
name: "success2", name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
want: &grpclbServiceConfig{ want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{ ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")}, {"round_robin": json.RawMessage("{}")},
@ -102,16 +61,11 @@ func Test_parseServiceConfig(t *testing.T) {
}, },
}, },
}, },
{
name: "no_grpclb",
s: `{"loadBalancingConfig":[{"notgrpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: nil,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := parseServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) { if got, err := (&lbBuilder{}).ParseConfig(json.RawMessage(tt.s)); !reflect.DeepEqual(got, tt.want) || !strings.Contains(fmt.Sprint(err), fmt.Sprint(tt.wantErr)) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want) t.Errorf("parseFullServiceConfig() = %+v, %+v, want %+v, <contains %q>", got, err, tt.want, tt.wantErr)
} }
}) })
} }
@ -123,30 +77,29 @@ func Test_childIsPickFirst(t *testing.T) {
s string s string
want bool want bool
}{ }{
{
name: "invalid",
s: "",
want: false,
},
{ {
name: "pickfirst_only", name: "pickfirst_only",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`, s: `{"childPolicy":[{"pick_first":{}}]}`,
want: true, want: true,
}, },
{ {
name: "pickfirst_before_rr", name: "pickfirst_before_rr",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}},{"round_robin":{}}]}}]}`, s: `{"childPolicy":[{"pick_first":{}},{"round_robin":{}}]}`,
want: true, want: true,
}, },
{ {
name: "rr_before_pickfirst", name: "rr_before_pickfirst",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, s: `{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`,
want: false, want: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := childIsPickFirst(tt.s); got != tt.want { gc, err := (&lbBuilder{}).ParseConfig(json.RawMessage(tt.s))
if err != nil {
t.Fatalf("Parse(%v) = _, %v; want _, nil", tt.s, err)
}
if got := childIsPickFirst(gc.(*grpclbServiceConfig)); got != tt.want {
t.Errorf("childIsPickFirst() = %v, want %v", got, tt.want) t.Errorf("childIsPickFirst() = %v, want %v", got, tt.want)
} }
}) })

View File

@ -44,6 +44,7 @@ import (
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing"
) )
@ -808,7 +809,11 @@ func TestFallback(t *testing.T) {
} }
func TestGRPCLBPickFirst(t *testing.T) { func TestGRPCLBPickFirst(t *testing.T) {
const grpclbServiceConfigWithPickFirst = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}` const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
svcCfg, err := serviceconfig.Parse(pfc)
if err != nil {
t.Fatalf("Error parsing config %q: %v", pfc, err)
}
defer leakcheck.Check(t) defer leakcheck.Check(t)
@ -866,7 +871,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
Type: resolver.GRPCLB, Type: resolver.GRPCLB,
ServerName: lbServerName, ServerName: lbServerName,
}}, }},
ServiceConfig: grpclbServiceConfigWithPickFirst, ServiceConfig: svcCfg,
}) })
result = "" result = ""
@ -905,6 +910,10 @@ func TestGRPCLBPickFirst(t *testing.T) {
} }
// Switch sub policy to roundrobin. // Switch sub policy to roundrobin.
grpclbServiceConfigEmpty, err := serviceconfig.Parse(`{}`)
if err != nil {
t.Fatalf("Error parsing config %q: %v", grpclbServiceConfigEmpty, err)
}
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{ Addresses: []resolver.Address{{
@ -912,7 +921,7 @@ func TestGRPCLBPickFirst(t *testing.T) {
Type: resolver.GRPCLB, Type: resolver.GRPCLB,
ServerName: lbServerName, ServerName: lbServerName,
}}, }},
ServiceConfig: `{}`, ServiceConfig: grpclbServiceConfigEmpty,
}) })
result = "" result = ""

View File

@ -192,7 +192,7 @@ func (bg *balancerGroup) handleResolvedAddrs(id string, addrs []resolver.Address
return return
} }
if ub, ok := b.(balancer.V2Balancer); ok { if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateResolverState(resolver.State{Addresses: addrs}) ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}})
} else { } else {
b.HandleResolvedAddrs(addrs, nil) b.HandleResolvedAddrs(addrs, nil)
} }

View File

@ -39,6 +39,7 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
) )
const ( const (
@ -92,6 +93,14 @@ func (b *xdsBalancerBuilder) Name() string {
return xdsName return xdsName
} }
func (x *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg xdsConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into xds config", string(c))
}
return &cfg, nil
}
// edsBalancerInterface defines the interface that edsBalancer must implement to // edsBalancerInterface defines the interface that edsBalancer must implement to
// communicate with xdsBalancer. // communicate with xdsBalancer.
// //
@ -219,26 +228,6 @@ func (x *xdsBalancer) run() {
} }
} }
func getBalancerConfig(serviceConfig string) *xdsConfig {
sc := parseFullServiceConfig(serviceConfig)
if sc == nil {
return nil
}
var xdsConfigRaw json.RawMessage
for _, lbcfg := range sc.LoadBalancingConfig {
if lbcfg.Name == xdsName {
xdsConfigRaw = lbcfg.Config
break
}
}
var cfg xdsConfig
if err := json.Unmarshal(xdsConfigRaw, &cfg); err != nil {
grpclog.Warningf("unable to unmarshal balancer config %s into xds config", string(xdsConfigRaw))
return nil
}
return &cfg
}
func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) { switch u := update.(type) {
case *subConnStateUpdate: case *subConnStateUpdate:
@ -252,11 +241,10 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
x.fallbackLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState) x.fallbackLB.HandleSubConnStateChange(u.sc, u.state.ConnectivityState)
} }
} }
case *resolver.State: case *balancer.ClientConnState:
cfg := getBalancerConfig(u.ServiceConfig) cfg, _ := u.BalancerConfig.(*xdsConfig)
if cfg == nil { if cfg == nil {
// service config parsing failed. should never happen. And this parsing will be removed, once // service config parsing failed. should never happen.
// we support service config validation.
return return
} }
@ -268,7 +256,7 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
x.startNewXDSClient(cfg) x.startNewXDSClient(cfg)
x.config = cfg x.config = cfg
x.fallbackInitData = &resolver.State{ x.fallbackInitData = &resolver.State{
Addresses: u.Addresses, Addresses: u.ResolverState.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct. // we can pass along the config struct.
} }
@ -294,15 +282,15 @@ func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
} }
} }
if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.Addresses) || fallbackChanged) { if x.fallbackLB != nil && (!reflect.DeepEqual(x.fallbackInitData.Addresses, u.ResolverState.Addresses) || fallbackChanged) {
x.updateFallbackWithResolverState(&resolver.State{ x.updateFallbackWithResolverState(&resolver.State{
Addresses: u.Addresses, Addresses: u.ResolverState.Addresses,
}) })
} }
x.config = cfg x.config = cfg
x.fallbackInitData = &resolver.State{ x.fallbackInitData = &resolver.State{
Addresses: u.Addresses, Addresses: u.ResolverState.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct. // we can pass along the config struct.
} }
@ -416,20 +404,7 @@ func (x *xdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
} }
} }
type serviceConfig struct { func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) {
LoadBalancingConfig []*loadBalancingConfig
}
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}
func (x *xdsBalancer) UpdateResolverState(s resolver.State) {
select { select {
case x.grpcUpdate <- &s: case x.grpcUpdate <- &s:
case <-x.ctx.Done(): case <-x.ctx.Done():
@ -497,11 +472,11 @@ func (x *xdsBalancer) switchFallback() {
func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) { func (x *xdsBalancer) updateFallbackWithResolverState(s *resolver.State) {
if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok { if lb, ok := x.fallbackLB.(balancer.V2Balancer); ok {
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{
Addresses: s.Addresses, Addresses: s.Addresses,
// TODO(yuxuanli): get the fallback balancer config once the validation change completes, where // TODO(yuxuanli): get the fallback balancer config once the validation change completes, where
// we can pass along the config struct. // we can pass along the config struct.
}) }})
} else { } else {
x.fallbackLB.HandleResolvedAddrs(s.Addresses, nil) x.fallbackLB.HandleResolvedAddrs(s.Addresses, nil)
} }
@ -540,6 +515,7 @@ func (x *xdsBalancer) buildFallBackBalancer(c *xdsConfig) {
// builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified // builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified
// balancer is registered or not. // balancer is registered or not.
builder := balancer.Get(c.FallBackPolicy.Name) builder := balancer.Get(c.FallBackPolicy.Name)
x.fallbackLB = builder.Build(x.cc, x.buildOpts) x.fallbackLB = builder.Build(x.cc, x.buildOpts)
} }
@ -598,6 +574,7 @@ func createDrainedTimer() *time.Timer {
} }
type xdsConfig struct { type xdsConfig struct {
serviceconfig.LoadBalancingConfig
BalancerName string BalancerName string
ChildPolicy *loadBalancingConfig ChildPolicy *loadBalancingConfig
FallBackPolicy *loadBalancingConfig FallBackPolicy *loadBalancingConfig

View File

@ -114,13 +114,11 @@ func (s) TestXdsLoadReporting(t *testing.T) {
Nanos: intervalNano, Nanos: intervalNano,
} }
cfg := &testBalancerConfig{ cfg := &xdsConfig{
BalancerName: addr, BalancerName: addr,
ChildPolicy: []lbPolicy{fakeBalancerA}, // Set this to skip cds. ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA}, // Set this to skip cds.
} }
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
ServiceConfig: constructServiceConfigFromXdsConfig(cfg),
})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
var ( var (
i int i int

View File

@ -38,10 +38,9 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
var lbABuilder *balancerABuilder var lbABuilder = &balancerABuilder{}
func init() { func init() {
lbABuilder = &balancerABuilder{}
balancer.Register(lbABuilder) balancer.Register(lbABuilder)
balancer.Register(&balancerBBuilder{}) balancer.Register(&balancerBBuilder{})
} }
@ -56,21 +55,19 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{}) grpctest.RunSubTests(t, s{})
} }
type lbPolicy string
const ( const (
fakeBalancerA lbPolicy = "fake_balancer_A" fakeBalancerA = "fake_balancer_A"
fakeBalancerB lbPolicy = "fake_balancer_B" fakeBalancerB = "fake_balancer_B"
fakeBalancerC lbPolicy = "fake_balancer_C" fakeBalancerC = "fake_balancer_C"
) )
var ( var (
testBalancerNameFooBar = "foo.bar" testBalancerNameFooBar = "foo.bar"
testServiceConfigFooBar = constructServiceConfigFromXdsConfig(&testBalancerConfig{ testLBConfigFooBar = &xdsConfig{
BalancerName: testBalancerNameFooBar, BalancerName: testBalancerNameFooBar,
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallbackPolicy: []lbPolicy{fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}) }
specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
@ -80,36 +77,6 @@ var (
latestFakeEdsBalancer *fakeEDSBalancer latestFakeEdsBalancer *fakeEDSBalancer
) )
type testBalancerConfig struct {
BalancerName string `json:"balancerName,omitempty"`
ChildPolicy []lbPolicy `json:"childPolicy,omitempty"`
FallbackPolicy []lbPolicy `json:"fallbackPolicy,omitempty"`
}
func (l *lbPolicy) UnmarshalJSON(b []byte) error {
// no need to implement, not used.
return nil
}
func (l lbPolicy) MarshalJSON() ([]byte, error) {
m := make(map[string]struct{})
m[string(l)] = struct{}{}
return json.Marshal(m)
}
func constructServiceConfigFromXdsConfig(xdsCfg *testBalancerConfig) string {
cfgRaw, _ := json.Marshal(xdsCfg)
sc, _ := json.Marshal(&serviceConfig{
LoadBalancingConfig: []*loadBalancingConfig{
{
Name: xdsName,
Config: cfgRaw,
},
},
})
return string(sc)
}
type balancerABuilder struct { type balancerABuilder struct {
mu sync.Mutex mu sync.Mutex
lastBalancer *balancerA lastBalancer *balancerA
@ -283,9 +250,9 @@ func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
defer lb.Close() defer lb.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{
Addresses: addrs, ResolverState: resolver.State{Addresses: addrs},
ServiceConfig: string(testServiceConfigFooBar), BalancerConfig: testLBConfigFooBar,
}) })
select { select {
case nsc := <-cc.newSubConns: case nsc := <-cc.newSubConns:
@ -316,9 +283,9 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
} }
defer lb.Close() defer lb.Close()
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{
Addresses: addrs, ResolverState: resolver.State{Addresses: addrs},
ServiceConfig: string(testServiceConfigFooBar), BalancerConfig: testLBConfigFooBar,
}) })
// verify fallback takes over // verify fallback takes over
@ -342,14 +309,14 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
addr, td, _, cleanup := setupServer(t) addr, td, _, cleanup := setupServer(t)
cleanups = append(cleanups, cleanup) cleanups = append(cleanups, cleanup)
workingServiceConfig := constructServiceConfigFromXdsConfig(&testBalancerConfig{ workingLBConfig := &xdsConfig{
BalancerName: addr, BalancerName: addr,
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallbackPolicy: []lbPolicy{fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
}) }
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{
Addresses: addrs, ResolverState: resolver.State{Addresses: addrs},
ServiceConfig: string(workingServiceConfig), BalancerConfig: workingLBConfig,
}) })
td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
@ -398,13 +365,16 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
} }
}() }()
for _, test := range []struct { for _, test := range []struct {
cfg *testBalancerConfig cfg *xdsConfig
responseToSend *discoverypb.DiscoveryResponse responseToSend *discoverypb.DiscoveryResponse
expectedChildPolicy *loadBalancingConfig expectedChildPolicy *loadBalancingConfig
}{ }{
{ {
cfg: &testBalancerConfig{ cfg: &xdsConfig{
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerA,
Config: json.RawMessage("{}"),
},
}, },
responseToSend: testEDSRespWithoutEndpoints, responseToSend: testEDSRespWithoutEndpoints,
expectedChildPolicy: &loadBalancingConfig{ expectedChildPolicy: &loadBalancingConfig{
@ -413,8 +383,11 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
}, },
}, },
{ {
cfg: &testBalancerConfig{ cfg: &xdsConfig{
ChildPolicy: []lbPolicy{fakeBalancerB}, ChildPolicy: &loadBalancingConfig{
Name: fakeBalancerB,
Config: json.RawMessage("{}"),
},
}, },
expectedChildPolicy: &loadBalancingConfig{ expectedChildPolicy: &loadBalancingConfig{
Name: string(fakeBalancerB), Name: string(fakeBalancerB),
@ -422,7 +395,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
}, },
}, },
{ {
cfg: &testBalancerConfig{}, cfg: &xdsConfig{},
responseToSend: testCDSResp, responseToSend: testCDSResp,
expectedChildPolicy: &loadBalancingConfig{ expectedChildPolicy: &loadBalancingConfig{
Name: "ROUND_ROBIN", Name: "ROUND_ROBIN",
@ -433,9 +406,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
cleanups = append(cleanups, cleanup) cleanups = append(cleanups, cleanup)
test.cfg.BalancerName = addr test.cfg.BalancerName = addr
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: test.cfg})
ServiceConfig: constructServiceConfigFromXdsConfig(test.cfg),
})
if test.responseToSend != nil { if test.responseToSend != nil {
td.sendResp(&response{resp: test.responseToSend}) td.sendResp(&response{resp: test.responseToSend})
} }
@ -462,7 +433,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
// not in fallback mode, overwrite fallback info. // not in fallback mode, overwrite fallback info.
// in fallback mode, update config or switch balancer. // in fallback mode, update config or switch balancer.
func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { func (s) TestXdsBalanceHandleBalancerConfigFallBackUpdate(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
@ -479,20 +450,19 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
addr, td, _, cleanup := setupServer(t) addr, td, _, cleanup := setupServer(t)
cfg := &testBalancerConfig{ cfg := xdsConfig{
BalancerName: addr, BalancerName: addr,
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallbackPolicy: []lbPolicy{fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
} }
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: &cfg})
ServiceConfig: constructServiceConfigFromXdsConfig(cfg),
})
addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
cfg.FallbackPolicy = []lbPolicy{fakeBalancerB} cfg2 := cfg
lb.UpdateResolverState(resolver.State{ cfg2.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerB}
Addresses: addrs, lb.UpdateClientConnState(balancer.ClientConnState{
ServiceConfig: constructServiceConfigFromXdsConfig(cfg), ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg2,
}) })
td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
@ -520,10 +490,11 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
t.Fatalf("timeout when geting new subconn result") t.Fatalf("timeout when geting new subconn result")
} }
cfg.FallbackPolicy = []lbPolicy{fakeBalancerA} cfg3 := cfg
lb.UpdateResolverState(resolver.State{ cfg3.FallBackPolicy = &loadBalancingConfig{Name: fakeBalancerA}
Addresses: addrs, lb.UpdateClientConnState(balancer.ClientConnState{
ServiceConfig: constructServiceConfigFromXdsConfig(cfg), ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &cfg3,
}) })
// verify fallback balancer A takes over // verify fallback balancer A takes over
@ -554,14 +525,12 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
addr, td, _, cleanup := setupServer(t) addr, td, _, cleanup := setupServer(t)
defer cleanup() defer cleanup()
cfg := &testBalancerConfig{ cfg := &xdsConfig{
BalancerName: addr, BalancerName: addr,
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallbackPolicy: []lbPolicy{fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
} }
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
ServiceConfig: constructServiceConfigFromXdsConfig(cfg),
})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
@ -617,7 +586,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
} }
} }
func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { func (s) TestXdsBalancerFallBackSignalFromEdsBalancer(t *testing.T) {
originalNewEDSBalancer := newEDSBalancer originalNewEDSBalancer := newEDSBalancer
newEDSBalancer = newFakeEDSBalancer newEDSBalancer = newFakeEDSBalancer
defer func() { defer func() {
@ -634,14 +603,12 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
addr, td, _, cleanup := setupServer(t) addr, td, _, cleanup := setupServer(t)
defer cleanup() defer cleanup()
cfg := &testBalancerConfig{ cfg := &xdsConfig{
BalancerName: addr, BalancerName: addr,
ChildPolicy: []lbPolicy{fakeBalancerA}, ChildPolicy: &loadBalancingConfig{Name: fakeBalancerA},
FallbackPolicy: []lbPolicy{fakeBalancerA}, FallBackPolicy: &loadBalancingConfig{Name: fakeBalancerA},
} }
lb.UpdateResolverState(resolver.State{ lb.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: cfg})
ServiceConfig: constructServiceConfigFromXdsConfig(cfg),
})
td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
@ -698,16 +665,16 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
} }
func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) { func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
tesCfg := &testBalancerConfig{ js := json.RawMessage(`{
BalancerName: "fake.foo.bar", "balancerName": "fake.foo.bar",
ChildPolicy: []lbPolicy{fakeBalancerC, fakeBalancerA, fakeBalancerB}, // selects fakeBalancerA "childPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_A": {}}, {"fake_balancer_B": {}}],
FallbackPolicy: []lbPolicy{fakeBalancerC, fakeBalancerB, fakeBalancerA}, // selects fakeBalancerB "fallbackPolicy": [{"fake_balancer_C": {}}, {"fake_balancer_B": {}}, {"fake_balancer_A": {}}]
} }`)
js, _ := json.Marshal(tesCfg) cfg, err := (&xdsBalancerBuilder{}).ParseConfig(js)
var xdsCfg xdsConfig if err != nil {
if err := json.Unmarshal(js, &xdsCfg); err != nil { t.Fatalf("unable to unmarshal balancer config into xds config: %v", err)
t.Fatal("unable to unmarshal balancer config into xds config")
} }
xdsCfg := cfg.(*xdsConfig)
wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)} wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) { if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy) t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
@ -718,45 +685,6 @@ func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
} }
} }
func (s) TestXdsFullServiceConfigParsing(t *testing.T) {
tests := []struct {
name string
s string
want *serviceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"xds":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: []*loadBalancingConfig{
{"xds", json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`)},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"xds":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: []*loadBalancingConfig{
{"xds", json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("test name: %s, parseFullServiceConfig() = %+v, want %+v", tt.name, got, tt.want)
}
})
}
}
func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) { func (s) TestXdsLoadbalancingConfigParsing(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -88,7 +88,7 @@ type ccBalancerWrapper struct {
cc *ClientConn cc *ClientConn
balancer balancer.Balancer balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolver.State ccUpdateCh chan *balancer.ClientConnState
done chan struct{} done chan struct{}
mu sync.Mutex mu sync.Mutex
@ -99,7 +99,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{ ccb := &ccBalancerWrapper{
cc: cc, cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(), stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolver.State, 1), ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}), done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}), subConns: make(map[*acBalancerWrapper]struct{}),
} }
@ -126,7 +126,7 @@ func (ccb *ccBalancerWrapper) watcher() {
} else { } else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state) ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
} }
case s := <-ccb.resolverUpdateCh: case s := <-ccb.ccUpdateCh:
select { select {
case <-ccb.done: case <-ccb.done:
ccb.balancer.Close() ccb.balancer.Close()
@ -134,9 +134,9 @@ func (ccb *ccBalancerWrapper) watcher() {
default: default:
} }
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateResolverState(*s) ub.UpdateClientConnState(*s)
} else { } else {
ccb.balancer.HandleResolvedAddrs(s.Addresses, nil) ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
} }
case <-ccb.done: case <-ccb.done:
} }
@ -155,6 +155,7 @@ func (ccb *ccBalancerWrapper) watcher() {
return return
default: default:
} }
ccb.cc.firstResolveEvent.Fire()
} }
} }
@ -179,9 +180,10 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
}) })
} }
func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) { func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName { if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer. // Filter any grpclb addresses since we don't have the grpclb balancer.
s := ccs.ResolverState
for i := 0; i < len(s.Addresses); { for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB { if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:]) copy(s.Addresses[i:], s.Addresses[i+1:])
@ -192,10 +194,10 @@ func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
} }
} }
select { select {
case <-ccb.resolverUpdateCh: case <-ccb.ccUpdateCh:
default: default:
} }
ccb.resolverUpdateCh <- &s ccb.ccUpdateCh <- ccs
} }
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {

View File

@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
) )
var _ balancer.Builder = &magicalLB{} var _ balancer.Builder = &magicalLB{}
@ -148,12 +149,12 @@ func (s) TestSwitchBalancer(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err) t.Fatalf("check pickfirst returned non-nil error: %v", err)
} }
// Switch to roundrobin. // Switch to roundrobin.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs}) cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
if err := checkRoundRobin(cc, servers); err != nil { if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err) t.Fatalf("check roundrobin returned non-nil error: %v", err)
} }
// Switch to pickfirst. // Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs}) cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
if err := checkPickFirst(cc, servers); err != nil { if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err) t.Fatalf("check pickfirst returned non-nil error: %v", err)
} }
@ -180,7 +181,7 @@ func (s) TestBalancerDialOption(t *testing.T) {
t.Fatalf("check roundrobin returned non-nil error: %v", err) t.Fatalf("check roundrobin returned non-nil error: %v", err)
} }
// Switch to pickfirst. // Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs}) cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
// Balancer is still roundrobin. // Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil { if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err) t.Fatalf("check roundrobin returned non-nil error: %v", err)
@ -336,7 +337,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
} }
defer cc.Close() defer cc.Close()
sc := `{"loadBalancingPolicy": "round_robin"}` sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
var isRoundRobin bool var isRoundRobin bool
@ -432,7 +433,7 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
} }
sc := `{"loadBalancingPolicy": "round_robin"}` sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc}) r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
var isRoundRobin bool var isRoundRobin bool
for i := 0; i < 200; i++ { for i := 0; i < 200; i++ {
@ -509,8 +510,16 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err) t.Fatalf("check pickfirst returned non-nil error: %v", err)
} }
// Switch to roundrobin, and check against server[1] and server[2]. // Switch to roundrobin, and check against server[1] and server[2].
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs}) cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
if err := checkRoundRobin(cc, servers[1:]); err != nil { if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err) t.Fatalf("check roundrobin returned non-nil error: %v", err)
} }
} }
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
}
return c
}

View File

@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@ -532,24 +533,6 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
} }
} }
// gRPC should resort to default service config when:
// * resolver service config is disabled
// * or, resolver does not return a service config or returns an invalid one.
func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
if cc.dopts.disableServiceConfig {
return true
}
// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
// Right now, we assume that empty service config string means resolver does not return a config.
if sc == "" {
return true
}
// TODO: the logic below is temporary. Once we finish the logic to validate service config
// in resolver, we will replace the logic below.
_, err := parseServiceConfig(sc)
return err != nil
}
func (cc *ClientConn) updateResolverState(s resolver.State) error { func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.mu.Lock() cc.mu.Lock()
defer cc.mu.Unlock() defer cc.mu.Unlock()
@ -560,44 +543,37 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
return nil return nil
} }
if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) { if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil { if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
cc.applyServiceConfig(cc.dopts.defaultServiceConfig) cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
} }
} else { } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
// TODO: the parsing logic below will be moved inside resolver. cc.applyServiceConfig(sc)
sc, err := parseServiceConfig(s.ServiceConfig)
if err != nil {
return err
}
if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
cc.applyServiceConfig(sc)
}
}
// update the service config that will be sent to balancer.
if cc.sc != nil {
s.ServiceConfig = cc.sc.rawJSONString
} }
var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil { if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial // Only look at balancer types and switch balancer if balancer dial
// option is not set. // option is not set.
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
var newBalancerName string var newBalancerName string
// TODO: use new loadBalancerConfig field with appropriate priority. if cc.sc != nil && cc.sc.lbConfig != nil {
if isGRPCLB { newBalancerName = cc.sc.lbConfig.name
newBalancerName = grpclbName balCfg = cc.sc.lbConfig.cfg
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else { } else {
newBalancerName = PickFirstBalancerName var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
} }
cc.switchBalancer(newBalancerName) cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil { } else if cc.balancerWrapper == nil {
@ -607,8 +583,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
} }
cc.balancerWrapper.updateResolverState(s) cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
cc.firstResolveEvent.Fire()
return nil return nil
} }

View File

@ -916,7 +916,7 @@ func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
// SwitchBalancer before NewAddress. There was no balancer created, this // SwitchBalancer before NewAddress. There was no balancer created, this
// makes sure we don't call close on nil balancerWrapper. // makes sure we don't call close on nil balancerWrapper.
r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic. r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn. time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
} }
@ -932,7 +932,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
} }
// Send a new service config while closing the ClientConn. // Send a new service config while closing the ClientConn.
go cc.Close() go cc.Close()
go r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic. go r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
} }
} }
@ -1025,7 +1025,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err) t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
} }
defer cc.Close() defer cc.Close()
r.UpdateState(resolver.State{ServiceConfig: `{ r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1037,11 +1037,11 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
"waitForReady": true "waitForReady": true
} }
] ]
}`}) }`)})
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
m := cc.GetMethodConfig("/foo/Bar") m := cc.GetMethodConfig("/foo/Bar")
if m.WaitForReady != nil { if m.WaitForReady != nil {
t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %v", m) t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %+v", m)
} }
} }
@ -1327,7 +1327,7 @@ func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r r
// Resolver service config gets ignored since resolver service config is disabled. // Resolver service config gets ignored since resolver service config is disabled.
r.(*manual.Resolver).UpdateState(resolver.State{ r.(*manual.Resolver).UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}}, Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: "{}", ServiceConfig: parseCfg("{}"),
}) })
if !verifyWaitForReadyEqualsTrue(cc) { if !verifyWaitForReadyEqualsTrue(cc) {
t.Fatal("default service config failed to be applied after 1s") t.Fatal("default service config failed to be applied after 1s")
@ -1356,7 +1356,7 @@ func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T
defer cc.Close() defer cc.Close()
r.(*manual.Resolver).UpdateState(resolver.State{ r.(*manual.Resolver).UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}}, Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: "{something wrong,}", ServiceConfig: nil,
}) })
if !verifyWaitForReadyEqualsTrue(cc) { if !verifyWaitForReadyEqualsTrue(cc) {
t.Fatal("default service config failed to be applied after 1s") t.Fatal("default service config failed to be applied after 1s")

View File

@ -37,6 +37,9 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by // KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience. // default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second KeepaliveMinPingTime = 10 * time.Second
// ParseServiceConfig is a function to parse JSON service configs into
// opaque data structures.
ParseServiceConfig func(sc string) (interface{}, error)
) )
// HealthChecker defines the signature of the client-side LB channel health checking function. // HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -20,6 +20,10 @@
// All APIs in this package are experimental. // All APIs in this package are experimental.
package resolver package resolver
import (
"google.golang.org/grpc/serviceconfig"
)
var ( var (
// m is a map from scheme to resolver builder. // m is a map from scheme to resolver builder.
m = make(map[string]Builder) m = make(map[string]Builder)
@ -100,11 +104,12 @@ type BuildOption struct {
// State contains the current Resolver state relevant to the ClientConn. // State contains the current Resolver state relevant to the ClientConn.
type State struct { type State struct {
Addresses []Address // Resolved addresses for the target Addresses []Address // Resolved addresses for the target
ServiceConfig string // JSON representation of the service config // ServiceConfig is the parsed service config; obtained from
// serviceconfig.Parse.
ServiceConfig serviceconfig.Config
// TODO: add Err error // TODO: add Err error
// TODO: add ParsedServiceConfig interface{}
} }
// ClientConn contains the callbacks for resolver to notify any updates // ClientConn contains the callbacks for resolver to notify any updates

View File

@ -138,19 +138,22 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
return return
} }
grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
if channelz.IsOn() { c, err := parseServiceConfig(sc)
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: sc}) if err != nil {
return
} }
ccr.curState.ServiceConfig = sc if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: c})
}
ccr.curState.ServiceConfig = c
ccr.cc.updateResolverState(ccr.curState) ccr.cc.updateResolverState(ccr.curState)
} }
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
if s.ServiceConfig == ccr.curState.ServiceConfig && (len(ccr.curState.Addresses) == 0) == (len(s.Addresses) == 0) {
return
}
var updates []string var updates []string
if s.ServiceConfig != ccr.curState.ServiceConfig { oldSC, oldOK := ccr.curState.ServiceConfig.(*ServiceConfig)
newSC, newOK := s.ServiceConfig.(*ServiceConfig)
if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
updates = append(updates, "service config updated") updates = append(updates, "service config updated")
} }
if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {

View File

@ -25,8 +25,11 @@ import (
"strings" "strings"
"time" "time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/serviceconfig"
) )
const maxInt = int(^uint(0) >> 1) const maxInt = int(^uint(0) >> 1)
@ -61,6 +64,11 @@ type MethodConfig struct {
retryPolicy *retryPolicy retryPolicy *retryPolicy
} }
type lbConfig struct {
name string
cfg serviceconfig.LoadBalancingConfig
}
// ServiceConfig is provided by the service provider and contains parameters for how // ServiceConfig is provided by the service provider and contains parameters for how
// clients that connect to the service should behave. // clients that connect to the service should behave.
// //
@ -68,10 +76,18 @@ type MethodConfig struct {
// through name resolver, as specified here // through name resolver, as specified here
// https://github.com/grpc/grpc/blob/master/doc/service_config.md // https://github.com/grpc/grpc/blob/master/doc/service_config.md
type ServiceConfig struct { type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified serviceconfig.Config
// via grpc.WithBalancer will override this.
// LB is the load balancer the service providers recommends. The balancer
// specified via grpc.WithBalancer will override this. This is deprecated;
// lbConfigs is preferred. If lbConfig and LB are both present, lbConfig
// will be used.
LB *string LB *string
// lbConfig is the service config's load balancing configuration. If
// lbConfig and LB are both present, lbConfig will be used.
lbConfig *lbConfig
// Methods contains a map for the methods in this service. If there is an // Methods contains a map for the methods in this service. If there is an
// exact match for a method (i.e. /service/method) in the map, use the // exact match for a method (i.e. /service/method) in the map, use the
// corresponding MethodConfig. If there's no exact match, look for the // corresponding MethodConfig. If there's no exact match, look for the
@ -233,15 +249,27 @@ type jsonMC struct {
RetryPolicy *jsonRetryPolicy RetryPolicy *jsonRetryPolicy
} }
type loadBalancingConfig map[string]json.RawMessage
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct { type jsonSC struct {
LoadBalancingPolicy *string LoadBalancingPolicy *string
LoadBalancingConfig *[]loadBalancingConfig
MethodConfig *[]jsonMC MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig HealthCheckConfig *healthCheckConfig
} }
func init() {
internal.ParseServiceConfig = func(sc string) (interface{}, error) {
return parseServiceConfig(sc)
}
}
func parseServiceConfig(js string) (*ServiceConfig, error) { func parseServiceConfig(js string) (*ServiceConfig, error) {
if len(js) == 0 {
return nil, fmt.Errorf("no JSON service config provided")
}
var rsc jsonSC var rsc jsonSC
err := json.Unmarshal([]byte(js), &rsc) err := json.Unmarshal([]byte(js), &rsc)
if err != nil { if err != nil {
@ -255,10 +283,38 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
healthCheckConfig: rsc.HealthCheckConfig, healthCheckConfig: rsc.HealthCheckConfig,
rawJSONString: js, rawJSONString: js,
} }
if rsc.LoadBalancingConfig != nil {
for i, lbcfg := range *rsc.LoadBalancingConfig {
if len(lbcfg) != 1 {
err := fmt.Errorf("invalid loadBalancingConfig: entry %v does not contain exactly 1 policy/config pair: %q", i, lbcfg)
grpclog.Warningf(err.Error())
return nil, err
}
var name string
var jsonCfg json.RawMessage
for name, jsonCfg = range lbcfg {
}
builder := balancer.Get(name)
if builder == nil {
continue
}
sc.lbConfig = &lbConfig{name: name}
if parser, ok := builder.(balancer.ConfigParser); ok {
var err error
sc.lbConfig.cfg, err = parser.ParseConfig(jsonCfg)
if err != nil {
return nil, fmt.Errorf("error parsing loadBalancingConfig for policy %q: %v", name, err)
}
} else if string(jsonCfg) != "{}" {
grpclog.Warningf("non-empty balancer configuration %q, but balancer does not implement ParseConfig", string(jsonCfg))
}
break
}
}
if rsc.MethodConfig == nil { if rsc.MethodConfig == nil {
return &sc, nil return &sc, nil
} }
for _, m := range *rsc.MethodConfig { for _, m := range *rsc.MethodConfig {
if m.Name == nil { if m.Name == nil {
continue continue
@ -299,11 +355,11 @@ func parseServiceConfig(js string) (*ServiceConfig, error) {
} }
if sc.retryThrottling != nil { if sc.retryThrottling != nil {
if sc.retryThrottling.MaxTokens <= 0 || if mt := sc.retryThrottling.MaxTokens; mt <= 0 || mt > 1000 {
sc.retryThrottling.MaxTokens > 1000 || return nil, fmt.Errorf("invalid retry throttling config: maxTokens (%v) out of range (0, 1000]", mt)
sc.retryThrottling.TokenRatio <= 0 { }
// Illegal throttling config; disable throttling. if tr := sc.retryThrottling.TokenRatio; tr <= 0 {
sc.retryThrottling = nil return nil, fmt.Errorf("invalid retry throttling config: tokenRatio (%v) may not be negative", tr)
} }
} }
return &sc, nil return &sc, nil

View File

@ -19,19 +19,81 @@
package grpc package grpc
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
) )
type parseTestCase struct {
scjs string
wantSC *ServiceConfig
wantErr bool
}
func runParseTests(t *testing.T, testCases []parseTestCase) {
for _, c := range testCases {
sc, err := parseServiceConfig(c.scjs)
if !c.wantErr {
c.wantSC.rawJSONString = c.scjs
}
if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
}
type pbbData struct {
serviceconfig.LoadBalancingConfig
Foo string
Bar int
}
type parseBalancerBuilder struct{}
func (parseBalancerBuilder) Name() string {
return "pbb"
}
func (parseBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
d := pbbData{}
if err := json.Unmarshal(c, &d); err != nil {
return nil, err
}
return d, nil
}
func (parseBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
panic("unimplemented")
}
func init() {
balancer.Register(parseBalancerBuilder{})
}
func (s) TestParseLBConfig(t *testing.T) {
testcases := []parseTestCase{
{
`{
"loadBalancingConfig": [{"pbb": { "foo": "hi" } }]
}`,
&ServiceConfig{
Methods: make(map[string]MethodConfig),
lbConfig: &lbConfig{name: "pbb", cfg: pbbData{Foo: "hi"}},
},
false,
},
}
runParseTests(t, testcases)
}
func (s) TestParseLoadBalancer(t *testing.T) { func (s) TestParseLoadBalancer(t *testing.T) {
testcases := []struct { testcases := []parseTestCase{
scjs string
wantSC *ServiceConfig
wantErr bool
}{
{ {
`{ `{
"loadBalancingPolicy": "round_robin", "loadBalancingPolicy": "round_robin",
@ -76,21 +138,11 @@ func (s) TestParseLoadBalancer(t *testing.T) {
true, true,
}, },
} }
runParseTests(t, testcases)
for _, c := range testcases {
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
} }
func (s) TestParseWaitForReady(t *testing.T) { func (s) TestParseWaitForReady(t *testing.T) {
testcases := []struct { testcases := []parseTestCase{
scjs string
wantSC *ServiceConfig
wantErr bool
}{
{ {
`{ `{
"methodConfig": [ "methodConfig": [
@ -165,20 +217,11 @@ func (s) TestParseWaitForReady(t *testing.T) {
}, },
} }
for _, c := range testcases { runParseTests(t, testcases)
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
} }
func (s) TestPraseTimeOut(t *testing.T) { func (s) TestParseTimeOut(t *testing.T) {
testcases := []struct { testcases := []parseTestCase{
scjs string
wantSC *ServiceConfig
wantErr bool
}{
{ {
`{ `{
"methodConfig": [ "methodConfig": [
@ -247,20 +290,11 @@ func (s) TestPraseTimeOut(t *testing.T) {
}, },
} }
for _, c := range testcases { runParseTests(t, testcases)
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
} }
func (s) TestPraseMsgSize(t *testing.T) { func (s) TestParseMsgSize(t *testing.T) {
testcases := []struct { testcases := []parseTestCase{
scjs string
wantSC *ServiceConfig
wantErr bool
}{
{ {
`{ `{
"methodConfig": [ "methodConfig": [
@ -316,12 +350,7 @@ func (s) TestPraseMsgSize(t *testing.T) {
}, },
} }
for _, c := range testcases { runParseTests(t, testcases)
sc, err := parseServiceConfig(c.scjs)
if c.wantErr != (err != nil) || !scCompareWithRawJSONSkipped(sc, c.wantSC) {
t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr)
}
}
} }
func (s) TestParseDuration(t *testing.T) { func (s) TestParseDuration(t *testing.T) {
@ -384,15 +413,3 @@ func newDuration(b time.Duration) *time.Duration {
func newString(b string) *string { func newString(b string) *string {
return &b return &b
} }
func scCompareWithRawJSONSkipped(s1, s2 *ServiceConfig) bool {
if s1 == nil && s2 == nil {
return true
}
if (s1 == nil) != (s2 == nil) {
return false
}
s1.rawJSONString = ""
s2.rawJSONString = ""
return reflect.DeepEqual(s1, s2)
}

View File

@ -0,0 +1,46 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package serviceconfig defines types and methods for operating on gRPC
// service configs.
package serviceconfig
import (
"google.golang.org/grpc/internal"
)
// Config represents an opaque data structure holding a service config.
type Config interface {
isConfig()
}
// LoadBalancingConfig represents an opaque data structure holding a load
// balancer config.
type LoadBalancingConfig interface {
isLoadBalancingConfig()
}
// Parse parses the JSON service config provided into an internal form or
// returns an error if the config is invalid.
func Parse(ServiceConfigJSON string) (Config, error) {
c, err := internal.ParseServiceConfig(ServiceConfigJSON)
if err != nil {
return nil, err
}
return c.(Config), err
}

View File

@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata" "google.golang.org/grpc/testdata"
@ -240,7 +241,7 @@ func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer // wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -1425,7 +1426,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer // wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -1569,7 +1570,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid) cm := channelz.GetChannel(cid)
@ -1586,7 +1587,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
newSC := `{ newSC := parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1599,19 +1600,20 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
"timeout": ".001s" "timeout": ".001s"
} }
] ]
}` }`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC}) r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid) cm := channelz.GetChannel(cid)
var es []string
for i := len(cm.Trace.Events) - 1; i >= 0; i-- { for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") { if strings.Contains(cm.Trace.Events[i].Desc, "service config updated") {
break break
} }
es = append(es, cm.Trace.Events[i].Desc)
if i == 0 { if i == 0 {
return false, fmt.Errorf("events do not contain expected address resolution of new service config") return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n"))
} }
} }
return true, nil return true, nil
@ -1883,7 +1885,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}}, ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)})
// wait for the shutdown of grpclb balancer // wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -2012,3 +2014,11 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
}
return c
}

View File

@ -1481,7 +1481,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}} addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1502,7 +1502,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
"waitForReady": false "waitForReady": false
} }
] ]
}`}) }`)})
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
@ -1520,7 +1520,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
} }
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{ r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1541,7 +1541,7 @@ func (s) TestGetMethodConfig(t *testing.T) {
"waitForReady": false "waitForReady": false
} }
] ]
}`}) }`)})
// Make sure service config has been processed by grpc. // Make sure service config has been processed by grpc.
for { for {
@ -1568,7 +1568,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}} addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1585,7 +1585,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
"timeout": ".001s" "timeout": ".001s"
} }
] ]
}`}) }`)})
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
@ -1610,7 +1610,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1627,7 +1627,7 @@ func (s) TestServiceConfigWaitForReady(t *testing.T) {
"timeout": ".001s" "timeout": ".001s"
} }
] ]
}`}) }`)})
// Wait for the new service config to take effect. // Wait for the new service config to take effect.
for { for {
@ -1657,7 +1657,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
addrs := []resolver.Address{{Addr: te.srvAddr}} addrs := []resolver.Address{{Addr: te.srvAddr}}
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1674,7 +1674,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
"timeout": "3600s" "timeout": "3600s"
} }
] ]
}`}) }`)})
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
@ -1704,7 +1704,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1721,7 +1721,7 @@ func (s) TestServiceConfigTimeout(t *testing.T) {
"timeout": ".000000001s" "timeout": ".000000001s"
} }
] ]
}`}) }`)})
// Wait for the new service config to take effect. // Wait for the new service config to take effect.
for { for {
@ -1767,7 +1767,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
scjs := `{ sc := parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -1784,7 +1784,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
"maxResponseMessageBytes": 2048 "maxResponseMessageBytes": 2048
} }
] ]
}` }`)
// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te1 := testServiceConfigSetup(t, e) te1 := testServiceConfigSetup(t, e)
@ -1796,7 +1796,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
cc1 := te1.clientConn() cc1 := te1.clientConn()
addrs := []resolver.Address{{Addr: te1.srvAddr}} addrs := []resolver.Address{{Addr: te1.srvAddr}}
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: scjs}) r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
tc := testpb.NewTestServiceClient(cc1) tc := testpb.NewTestServiceClient(cc1)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
@ -1867,7 +1867,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te2.startServer(&testServer{security: e.security}) te2.startServer(&testServer{security: e.security})
defer te2.tearDown() defer te2.tearDown()
cc2 := te2.clientConn() cc2 := te2.clientConn()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: scjs}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc})
tc = testpb.NewTestServiceClient(cc2) tc = testpb.NewTestServiceClient(cc2)
for { for {
@ -1928,7 +1928,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
defer te3.tearDown() defer te3.tearDown()
cc3 := te3.clientConn() cc3 := te3.clientConn()
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: scjs}) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc})
tc = testpb.NewTestServiceClient(cc3) tc = testpb.NewTestServiceClient(cc3)
for { for {
@ -2020,7 +2020,7 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}}, Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -2033,7 +2033,7 @@ func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
"timeout": "10s" "timeout": "10s"
} }
] ]
}`}) }`)})
// Make sure service config has been processed by grpc. // Make sure service config has been processed by grpc.
for { for {
if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
@ -5295,7 +5295,7 @@ func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
} }
func (ss *stubServer) newServiceConfig(sc string) { func (ss *stubServer) newServiceConfig(sc string) {
ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: sc}) ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: parseCfg(sc)})
} }
func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error { func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
@ -7214,7 +7214,7 @@ func (s) TestRPCWaitsForResolver(t *testing.T) {
time.Sleep(time.Second) time.Sleep(time.Second)
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: te.srvAddr}}, Addresses: []resolver.Address{{Addr: te.srvAddr}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"methodConfig": [ "methodConfig": [
{ {
"name": [ "name": [
@ -7226,7 +7226,7 @@ func (s) TestRPCWaitsForResolver(t *testing.T) {
"maxRequestMessageBytes": 0 "maxRequestMessageBytes": 0
} }
] ]
}`}) }`)})
}() }()
// We wait a second before providing a service config and resolving // We wait a second before providing a service config and resolving
// addresses. So this will wait for that and then honor the // addresses. So this will wait for that and then honor the

View File

@ -194,11 +194,11 @@ func (s) TestHealthCheckWatchStateChange(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok { if ok := cc.WaitForStateChange(ctx, connectivity.Idle); !ok {
@ -262,11 +262,11 @@ func (s) TestHealthCheckHealthServerNotRegistered(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
@ -306,11 +306,11 @@ func (s) TestHealthCheckWithGoAway(t *testing.T) {
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
// make some rpcs to make sure connection is working. // make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -398,11 +398,11 @@ func (s) TestHealthCheckWithConnClose(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
// make some rpcs to make sure connection is working. // make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -457,11 +457,11 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
defer deferFunc() defer deferFunc()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
sc := `{ sc := parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}` }`)
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: sc, ServiceConfig: sc,
@ -552,11 +552,11 @@ func (s) TestHealthCheckWithClientConnClose(t *testing.T) {
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
// make some rpcs to make sure connection is working. // make some rpcs to make sure connection is working.
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -630,11 +630,11 @@ func (s) TestHealthCheckWithoutReportHealthCalledAddrConnShutDown(t *testing.T)
// The serviceName "delay" is specially handled at server side, where response will not be sent // The serviceName "delay" is specially handled at server side, where response will not be sent
// back to client immediately upon receiving the request (client should receive no response until // back to client immediately upon receiving the request (client should receive no response until
// test ends). // test ends).
sc := `{ sc := parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "delay" "serviceName": "delay"
} }
}` }`)
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: sc, ServiceConfig: sc,
@ -708,11 +708,11 @@ func (s) TestHealthCheckWithoutReportHealthCalled(t *testing.T) {
// test ends). // test ends).
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "delay" "serviceName": "delay"
} }
}`}) }`)})
select { select {
case <-hcExitChan: case <-hcExitChan:
@ -756,11 +756,11 @@ func testHealthCheckDisableWithDialOption(t *testing.T, addr string) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}}, Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
// send some rpcs to make sure transport has been created and is ready for use. // send some rpcs to make sure transport has been created and is ready for use.
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -795,11 +795,11 @@ func testHealthCheckDisableWithBalancer(t *testing.T, addr string) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: addr}}, Addresses: []resolver.Address{{Addr: addr}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "foo" "serviceName": "foo"
} }
}`}) }`)})
// send some rpcs to make sure transport has been created and is ready for use. // send some rpcs to make sure transport has been created and is ready for use.
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
@ -888,11 +888,11 @@ func (s) TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "channelzSuccess" "serviceName": "channelzSuccess"
} }
}`}) }`)})
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0, 0) cm, _ := channelz.GetTopChannels(0, 0)
@ -944,11 +944,11 @@ func (s) TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
r.UpdateState(resolver.State{ r.UpdateState(resolver.State{
Addresses: []resolver.Address{{Addr: lis.Addr().String()}}, Addresses: []resolver.Address{{Addr: lis.Addr().String()}},
ServiceConfig: `{ ServiceConfig: parseCfg(`{
"healthCheckConfig": { "healthCheckConfig": {
"serviceName": "channelzFailure" "serviceName": "channelzFailure"
} }
}`}) }`)})
if err := verifyResultWithDelay(func() (bool, error) { if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0, 0) cm, _ := channelz.GetTopChannels(0, 0)