xdsrouting: balancer implementation (#3746)

This commit is contained in:
Menghan Li 2020-07-21 11:55:49 -07:00 committed by GitHub
parent ca3959a1b2
commit 5f0e72845e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1477 additions and 11 deletions

View File

@ -23,4 +23,5 @@ import (
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the CDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/edsbalancer" // Register the EDS balancer
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
_ "google.golang.org/grpc/xds/internal/balancer/xdsrouting" // Register the xds_routing balancer
)

View File

@ -0,0 +1,227 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal"
)
type subBalancerState struct {
state balancer.State
// stateToAggregate is the connectivity state used only for state
// aggregation. It could be different from state.ConnectivityState. For
// example when a sub-balancer transitions from TransientFailure to
// connecting, state.ConnectivityState is Connecting, but stateToAggregate
// is still TransientFailure.
stateToAggregate connectivity.State
}
func (s *subBalancerState) String() string {
return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
}
type balancerStateAggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
mu sync.Mutex
// routes, one for each matcher.
routes []route
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[internal.LocalityID]*subBalancerState
}
func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
return &balancerStateAggregator{
cc: cc,
logger: logger,
idToPickerState: make(map[internal.LocalityID]*subBalancerState),
}
}
// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (rbsa *balancerStateAggregator) start() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.started = true
}
// Close closes the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to upate balancer state.
func (rbsa *balancerStateAggregator) close() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.started = false
rbsa.clearStates()
}
// add adds a sub-balancer state with weight. It adds a place holder, and waits
// for the real sub-balancer to update state.
//
// This is called when there's a new action.
func (rbsa *balancerStateAggregator) add(id internal.LocalityID) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.idToPickerState[id] = &subBalancerState{
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
// sub-balancers.
state: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
stateToAggregate: connectivity.Connecting,
}
}
// remove removes the sub-balancer state. Future updates from this sub-balancer,
// if any, will be ignored.
//
// This is called when an action is removed.
func (rbsa *balancerStateAggregator) remove(id internal.LocalityID) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
if _, ok := rbsa.idToPickerState[id]; !ok {
return
}
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(rbsa.idToPickerState, id)
}
// updateRoutes updates the routes. Note that it doesn't trigger an update to
// the parent ClientConn. The caller should decide when it's necessary, and call
// buildAndUpdate.
func (rbsa *balancerStateAggregator) updateRoutes(newRoutes []route) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
rbsa.routes = newRoutes
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (rbsa *balancerStateAggregator) UpdateState(id internal.LocalityID, state balancer.State) {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
pickerSt, ok := rbsa.idToPickerState[id]
if !ok {
// All state starts with an entry in pickStateMap. If ID is not in map,
// it's either removed, or never existed.
return
}
if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
// If old state is TransientFailure, and new state is Connecting, don't
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state
if !rbsa.started {
return
}
rbsa.cc.UpdateState(rbsa.build())
}
// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold rbsa.mu.
func (rbsa *balancerStateAggregator) clearStates() {
for _, pState := range rbsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
}
// buildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (rbsa *balancerStateAggregator) buildAndUpdate() {
rbsa.mu.Lock()
defer rbsa.mu.Unlock()
if !rbsa.started {
return
}
rbsa.cc.UpdateState(rbsa.build())
}
// build combines sub-states into one. The picker will do routing pick.
//
// Caller must hold rbsa.mu.
func (rbsa *balancerStateAggregator) build() balancer.State {
// TODO: the majority of this function (and UpdateState) is exactly the same
// as weighted_target's state aggregator. Try to make a general utility
// function/struct to handle the logic.
//
// One option: make a SubBalancerState that handles Update(State), including
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
var readyN, connectingN int
for _, ps := range rbsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
default:
aggregatedState = connectivity.TransientFailure
}
// The picker's return error might not be consistent with the
// aggregatedState. Because for routing, we want to always build picker with
// all sub-pickers (not even ready sub-pickers), so even if the overall
// state is Ready, pick for certain RPCs can behave like Connecting or
// TransientFailure.
rbsa.logger.Infof("Child pickers with routes: %s, actions: %+v", rbsa.routes, rbsa.idToPickerState)
return balancer.State{
ConnectivityState: aggregatedState,
Picker: newPickerGroup(rbsa.routes, rbsa.idToPickerState),
}
}

View File

@ -0,0 +1,34 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
const prefix = "[xds-routing-lb %p] "
var logger = grpclog.Component("xds")
func prefixLogger(p *routingBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

View File

@ -0,0 +1,256 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"encoding/json"
"fmt"
"regexp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
)
const xdsRoutingName = "xds_routing_experimental"
func init() {
balancer.Register(&routingBB{})
}
type routingBB struct{}
func (rbb *routingBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &routingBalancer{}
b.logger = prefixLogger(b)
b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
b.stateAggregator.start()
b.bg = balancergroup.New(cc, b.stateAggregator, nil, b.logger)
b.bg.Start()
b.logger.Infof("Created")
return b
}
func (rbb *routingBB) Name() string {
return xdsRoutingName
}
func (rbb *routingBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type route struct {
action string
m *compositeMatcher
}
func (r route) String() string {
return r.m.String() + "->" + r.action
}
type routingBalancer struct {
logger *grpclog.PrefixLogger
// TODO: make this package not dependent on xds specific code. Same as for
// weighted target balancer.
bg *balancergroup.BalancerGroup
stateAggregator *balancerStateAggregator
actions map[string]actionConfig
routes []route
}
// TODO: remove this and use strings directly as keys for balancer group.
func makeLocalityFromName(name string) internal.LocalityID {
return internal.LocalityID{Region: name}
}
// TODO: remove this and use strings directly as keys for balancer group.
func getNameFromLocality(id internal.LocalityID) string {
return id.Region
}
func (rb *routingBalancer) updateActions(s balancer.ClientConnState, newConfig *lbConfig) (needRebuild bool) {
addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
var rebuildStateAndPicker bool
// Remove sub-pickers and sub-balancers that are not in the new action list.
for name := range rb.actions {
if _, ok := newConfig.actions[name]; !ok {
l := makeLocalityFromName(name)
rb.stateAggregator.remove(l)
rb.bg.Remove(l)
// Trigger a state/picker update, because we don't want `ClientConn`
// to pick this sub-balancer anymore.
rebuildStateAndPicker = true
}
}
// For sub-balancers in the new action list,
// - add to balancer group if it's new,
// - forward the address/balancer config update.
for name, newT := range newConfig.actions {
l := makeLocalityFromName(name)
if _, ok := rb.actions[name]; !ok {
// If this is a new sub-balancer, add weights to the picker map.
rb.stateAggregator.add(l)
// Then add to the balancer group.
rb.bg.Add(l, balancer.Get(newT.ChildPolicy.Name))
// Not trigger a state/picker update. Wait for the new sub-balancer
// to send its updates.
}
// Forwards all the update:
// - Addresses are from the map after splitting with hierarchy path,
// - Top level service config and attributes are the same,
// - Balancer config comes from the targets map.
//
// TODO: handle error? How to aggregate errors and return?
_ = rb.bg.UpdateClientConnState(l, balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Attributes: s.ResolverState.Attributes,
},
BalancerConfig: newT.ChildPolicy.Config,
})
}
rb.actions = newConfig.actions
return rebuildStateAndPicker
}
func routeToMatcher(r routeConfig) (*compositeMatcher, error) {
var pathMatcher pathMatcherInterface
switch {
case r.regex != "":
re, err := regexp.Compile(r.regex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %q", r.regex)
}
pathMatcher = newPathRegexMatcher(re)
case r.path != "":
pathMatcher = newPathExactMatcher(r.path)
default:
pathMatcher = newPathPrefixMatcher(r.prefix)
}
var headerMatchers []headerMatcherInterface
for _, h := range r.headers {
var matcherT headerMatcherInterface
switch {
case h.exactMatch != "":
matcherT = newHeaderExactMatcher(h.name, h.exactMatch)
case h.regexMatch != "":
re, err := regexp.Compile(h.regexMatch)
if err != nil {
return nil, fmt.Errorf("failed to compile regex %q, skipping this matcher", h.regexMatch)
}
matcherT = newHeaderRegexMatcher(h.name, re)
case h.prefixMatch != "":
matcherT = newHeaderPrefixMatcher(h.name, h.prefixMatch)
case h.suffixMatch != "":
matcherT = newHeaderSuffixMatcher(h.name, h.suffixMatch)
case h.rangeMatch != nil:
matcherT = newHeaderRangeMatcher(h.name, h.rangeMatch.start, h.rangeMatch.end)
default:
matcherT = newHeaderPresentMatcher(h.name, h.presentMatch)
}
if h.invertMatch {
matcherT = newInvertMatcher(matcherT)
}
headerMatchers = append(headerMatchers, matcherT)
}
var fractionMatcher *fractionMatcher
if r.fraction != nil {
fractionMatcher = newFractionMatcher(*r.fraction)
}
return newCompositeMatcher(pathMatcher, headerMatchers, fractionMatcher), nil
}
func routesEqual(a, b []route) bool {
if len(a) != len(b) {
return false
}
for i := range a {
aa := a[i]
bb := b[i]
if aa.action != bb.action {
return false
}
if !aa.m.equal(bb.m) {
return false
}
}
return true
}
func (rb *routingBalancer) updateRoutes(newConfig *lbConfig) (needRebuild bool, _ error) {
var newRoutes []route
for _, rt := range newConfig.routes {
newMatcher, err := routeToMatcher(rt)
if err != nil {
return false, err
}
newRoutes = append(newRoutes, route{action: rt.action, m: newMatcher})
}
rebuildStateAndPicker := !routesEqual(newRoutes, rb.routes)
rb.routes = newRoutes
if rebuildStateAndPicker {
rb.stateAggregator.updateRoutes(rb.routes)
}
return rebuildStateAndPicker, nil
}
func (rb *routingBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
newConfig, ok := s.BalancerConfig.(*lbConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
rb.logger.Infof("update with config %+v, resolver state %+v", s.BalancerConfig, s.ResolverState)
rebuildForActions := rb.updateActions(s, newConfig)
rebuildForRoutes, err := rb.updateRoutes(newConfig)
if err != nil {
return fmt.Errorf("xds_routing balancer: failed to update routes: %v", err)
}
if rebuildForActions || rebuildForRoutes {
rb.stateAggregator.buildAndUpdate()
}
return nil
}
func (rb *routingBalancer) ResolverError(err error) {
rb.bg.ResolverError(err)
}
func (rb *routingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
rb.bg.UpdateSubConnState(sc, state)
}
func (rb *routingBalancer) Close() {
rb.stateAggregator.close()
rb.bg.Close()
}

View File

@ -28,7 +28,7 @@ import (
xdsclient "google.golang.org/grpc/xds/internal/client"
)
type action struct {
type actionConfig struct {
// ChildPolicy is the child policy and it's config.
ChildPolicy *internalserviceconfig.BalancerConfig
}
@ -48,7 +48,7 @@ type headerMatcher struct {
presentMatch bool
}
type route struct {
type routeConfig struct {
// Path, Prefix and Regex can have at most one set. This is guaranteed by
// config parsing.
path, prefix, regex string
@ -63,8 +63,8 @@ type route struct {
// lbConfig is the balancer config for xds routing policy.
type lbConfig struct {
serviceconfig.LoadBalancingConfig
routes []route
actions map[string]action
routes []routeConfig
actions map[string]actionConfig
}
// The following structs with `JSON` in name are temporary structs to unmarshal
@ -84,13 +84,13 @@ type routeJSON struct {
// lbConfigJSON is temporary struct for json unmarshal.
type lbConfigJSON struct {
Route []routeJSON
Action map[string]action
Action map[string]actionConfig
}
func (jc lbConfigJSON) toLBConfig() *lbConfig {
var ret lbConfig
for _, r := range jc.Route {
var tempR route
var tempR routeConfig
switch {
case r.Path != nil:
tempR.path = *r.Path

View File

@ -294,14 +294,14 @@ func Test_parseConfig(t *testing.T) {
name: "OK with path matchers only",
js: testJSONConfig,
want: &lbConfig{
routes: []route{
routes: []routeConfig{
{path: "/service_1/method_1", action: "cds:cluster_1"},
{path: "/service_1/method_2", action: "cds:cluster_1"},
{prefix: "/service_2/method_1", action: "weighted:cluster_1_cluster_2_1"},
{prefix: "/service_2", action: "weighted:cluster_1_cluster_2_1"},
{regex: "^/service_2/method_3$", action: "weighted:cluster_1_cluster_3_1"},
},
actions: map[string]action{
actions: map[string]actionConfig{
"cds:cluster_1": {ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: cdsName, Config: cdsConfig1},
},
@ -319,7 +319,7 @@ func Test_parseConfig(t *testing.T) {
name: "OK with all matchers",
js: testJSONConfigWithAllMatchers,
want: &lbConfig{
routes: []route{
routes: []routeConfig{
{path: "/service_1/method_1", action: "cds:cluster_1"},
{prefix: "/service_2/method_1", action: "cds:cluster_1"},
{regex: "^/service_2/method_3$", action: "cds:cluster_1"},
@ -332,7 +332,7 @@ func Test_parseConfig(t *testing.T) {
{prefix: "", headers: []headerMatcher{{name: "header-1", suffixMatch: "value-1"}}, action: "cds:cluster_2"},
{prefix: "", fraction: newUInt32P(31415), action: "cds:cluster_3"},
},
actions: map[string]action{
actions: map[string]actionConfig{
"cds:cluster_1": {ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: cdsName, Config: cdsConfig1},
},
@ -348,7 +348,7 @@ func Test_parseConfig(t *testing.T) {
},
}
cmpOptions := []cmp.Option{cmp.AllowUnexported(lbConfig{}, route{}, headerMatcher{}, int64Range{})}
cmpOptions := []cmp.Option{cmp.AllowUnexported(lbConfig{}, routeConfig{}, headerMatcher{}, int64Range{})}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -0,0 +1,61 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
)
// pickerGroup contains a list of route matchers and their corresponding
// pickers. For each pick, the first matched picker is used. If the picker isn't
// ready, the pick will be queued.
type pickerGroup struct {
routes []route
pickers map[string]balancer.Picker
}
func newPickerGroup(routes []route, idToPickerState map[internal.LocalityID]*subBalancerState) *pickerGroup {
pickers := make(map[string]balancer.Picker)
for id, st := range idToPickerState {
pickers[getNameFromLocality(id)] = st.state.Picker
}
return &pickerGroup{
routes: routes,
pickers: pickers,
}
}
var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found")
func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
for _, rt := range pg.routes {
if rt.m.match(info) {
// action from route is the ID for the sub-balancer to use.
p, ok := pg.pickers[rt.action]
if !ok {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
return p.Pick(info)
}
}
return balancer.PickResult{}, errNoMatchedRouteFound
}

View File

@ -0,0 +1,177 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
)
var (
testPickers = []*testutils.TestConstPicker{
{SC: testutils.TestSubConns[0]},
{SC: testutils.TestSubConns[1]},
}
)
func (s) TestRoutingPickerGroupPick(t *testing.T) {
tests := []struct {
name string
routes []route
pickers map[internal.LocalityID]*subBalancerState
info balancer.PickInfo
want balancer.PickResult
wantErr error
}{
{
name: "empty",
wantErr: errNoMatchedRouteFound,
},
{
name: "one route no match",
routes: []route{
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
},
info: balancer.PickInfo{FullMethodName: "/z/y"},
wantErr: errNoMatchedRouteFound,
},
{
name: "one route one match",
routes: []route{
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
},
info: balancer.PickInfo{FullMethodName: "/a/b"},
want: balancer.PickResult{SubConn: testutils.TestSubConns[0]},
},
{
name: "two routes first match",
routes: []route{
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"},
{m: newCompositeMatcher(newPathPrefixMatcher("/z/"), nil, nil), action: "action-1"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
makeLocalityFromName("action-1"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[1],
}},
},
info: balancer.PickInfo{FullMethodName: "/a/b"},
want: balancer.PickResult{SubConn: testutils.TestSubConns[0]},
},
{
name: "two routes second match",
routes: []route{
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"},
{m: newCompositeMatcher(newPathPrefixMatcher("/z/"), nil, nil), action: "action-1"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
makeLocalityFromName("action-1"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[1],
}},
},
info: balancer.PickInfo{FullMethodName: "/z/y"},
want: balancer.PickResult{SubConn: testutils.TestSubConns[1]},
},
{
name: "two routes both match former more specific",
routes: []route{
{m: newCompositeMatcher(newPathExactMatcher("/a/b"), nil, nil), action: "action-0"},
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-1"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
makeLocalityFromName("action-1"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[1],
}},
},
info: balancer.PickInfo{FullMethodName: "/a/b"},
// First route is a match, so first action is picked.
want: balancer.PickResult{SubConn: testutils.TestSubConns[0]},
},
{
name: "tow routes both match latter more specific",
routes: []route{
{m: newCompositeMatcher(newPathPrefixMatcher("/a/"), nil, nil), action: "action-0"},
{m: newCompositeMatcher(newPathExactMatcher("/a/b"), nil, nil), action: "action-1"},
},
pickers: map[internal.LocalityID]*subBalancerState{
makeLocalityFromName("action-0"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[0],
}},
makeLocalityFromName("action-1"): {state: balancer.State{
ConnectivityState: connectivity.Ready,
Picker: testPickers[1],
}},
},
info: balancer.PickInfo{FullMethodName: "/a/b"},
// First route is a match, so first action is picked, even though
// second is an exact match.
want: balancer.PickResult{SubConn: testutils.TestSubConns[0]},
},
}
cmpOpts := []cmp.Option{cmp.AllowUnexported(testutils.TestSubConn{})}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pg := newPickerGroup(tt.routes, tt.pickers)
got, err := pg.Pick(tt.info)
t.Logf("Pick(%+v) = {%+v, %+v}", tt.info, got, err)
if err != tt.wantErr {
t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !cmp.Equal(got, tt.want, cmpOpts...) {
t.Errorf("Pick() got = %v, want %v, diff %s", got, tt.want, cmp.Diff(got, tt.want, cmpOpts...))
}
})
}
}

View File

@ -0,0 +1,710 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsrouting
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/testutils"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
var (
rtBuilder balancer.Builder
rtParser balancer.ConfigParser
testBackendAddrStrs []string
)
const ignoreAttrsRRName = "ignore_attrs_round_robin"
type ignoreAttrsRRBuilder struct {
balancer.Builder
}
func (trrb *ignoreAttrsRRBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &ignoreAttrsRRBalancer{trrb.Builder.Build(cc, opts)}
}
func (*ignoreAttrsRRBuilder) Name() string {
return ignoreAttrsRRName
}
// ignoreAttrsRRBalancer clears attributes from all addresses.
//
// It's necessary in this tests because hierarchy modifies address.Attributes.
// Even if rr gets addresses with empty hierarchy, the attributes fields are
// different. This is a temporary walkaround for the tests to ignore attributes.
// Eventually, we need a way for roundrobin to know that two addresses with
// empty attributes are equal.
//
// TODO: delete this when the issue is resolved:
// https://github.com/grpc/grpc-go/issues/3611.
type ignoreAttrsRRBalancer struct {
balancer.Balancer
}
func (trrb *ignoreAttrsRRBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
var newAddrs []resolver.Address
for _, a := range s.ResolverState.Addresses {
a.Attributes = nil
newAddrs = append(newAddrs, a)
}
s.ResolverState.Addresses = newAddrs
return trrb.Balancer.UpdateClientConnState(s)
}
const testBackendAddrsCount = 12
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
rtBuilder = balancer.Get(xdsRoutingName)
rtParser = rtBuilder.(balancer.ConfigParser)
balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)})
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}
func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC balancer.SubConn, wantErr error) {
t.Helper()
for i := 0; i < 5; i++ {
gotSCSt, err := p.Pick(info)
if err != wantErr {
t.Fatalf("picker.Pick(%+v), got error %v, want %v", info, err, wantErr)
}
if !cmp.Equal(gotSCSt.SubConn, wantSC, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick(%+v), got %v, want SubConn=%v", info, gotSCSt, wantSC)
}
}
}
func TestRouting(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
configJSON1 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"/a/", "action":"cds:cluster_1"},
{"prefix":"", "headers":[{"name":"header-1", "exactMatch":"value-1"}], "action":"cds:cluster_2"}
]
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
{Addr: testBackendAddrStrs[1], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
m1 := make(map[resolver.Address]balancer.SubConn)
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
p1 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/z/y",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")),
},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/z/y",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")),
},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
}
}
// TestRoutingConfigUpdateAddRoute covers the cases the routing balancer
// receives config update with extra route, but the same actions.
func TestRoutingConfigUpdateAddRoute(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
configJSON1 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"/a/", "action":"cds:cluster_1"},
{"path":"/z/y", "action":"cds:cluster_2"}
]
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
{Addr: testBackendAddrStrs[1], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
m1 := make(map[resolver.Address]balancer.SubConn)
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
p1 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/c/d"},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
}
// A config update with different routes, but the same actions. Expect a
// picker update, but no subconn changes.
configJSON2 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"", "headers":[{"name":"header-1", "presentMatch":true}], "action":"cds:cluster_2"},
{"prefix":"/a/", "action":"cds:cluster_1"},
{"path":"/z/y", "action":"cds:cluster_2"}
]
}`
config2, err := rtParser.ParseConfig([]byte(configJSON2))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send update with the same addresses.
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// New change to actions, expect no newSubConn.
select {
case <-time.After(time.Millisecond * 500):
case <-cc.NewSubConnCh:
addrs := <-cc.NewSubConnAddrsCh
t.Fatalf("unexpected NewSubConn with address %v", addrs)
}
p2 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/a/z",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")),
},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/c/d",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")),
},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr)
}
}
// TestRoutingConfigUpdateAddRouteAndAction covers the cases the routing
// balancer receives config update with extra route and actions.
func TestRoutingConfigUpdateAddRouteAndAction(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
configJSON1 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"/a/", "action":"cds:cluster_1"},
{"path":"/z/y", "action":"cds:cluster_2"}
]
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
{Addr: testBackendAddrStrs[1], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
m1 := make(map[resolver.Address]balancer.SubConn)
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
p1 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/c/d"},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
}
// A config update with different routes, and different actions. Expect a
// new subconn and a picker update.
configJSON2 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_3":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"", "headers":[{"name":"header-1", "presentMatch":false, "invertMatch":true}], "action":"cds:cluster_3"},
{"prefix":"/a/", "action":"cds:cluster_1"},
{"path":"/z/y", "action":"cds:cluster_2"}
]
}`
config2, err := rtParser.ParseConfig([]byte(configJSON2))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil})
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
hierarchy.Set(wantAddrs[2], []string{"cds:cluster_3"}),
}},
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect exactly one new subconn.
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Should have no more newSubConn.
select {
case <-time.After(time.Millisecond * 500):
case <-cc.NewSubConnCh:
addrs := <-cc.NewSubConnAddrsCh
t.Fatalf("unexpected NewSubConn with address %v", addrs)
}
p2 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/a/z",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("header-1", "value-1")),
},
wantSC: m1[wantAddrs[2]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{
FullMethodName: "/c/d",
Ctx: metadata.NewOutgoingContext(context.Background(), metadata.Pairs("h", "v")),
},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr)
}
}
// TestRoutingConfigUpdateDeleteAll covers the cases the routing balancer receives config
// update with no routes. Pick should fail with details in error.
func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
cc := testutils.NewTestClientConn(t)
rtb := rtBuilder.Build(cc, balancer.BuildOptions{})
configJSON1 := `{
"Action": {
"cds:cluster_1":{ "childPolicy": [{"ignore_attrs_round_robin":""}] },
"cds:cluster_2":{ "childPolicy": [{"ignore_attrs_round_robin":""}] }
},
"Route": [
{"prefix":"/a/", "action":"cds:cluster_1"},
{"path":"/z/y", "action":"cds:cluster_2"}
]
}`
config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], Attributes: nil},
{Addr: testBackendAddrStrs[1], Attributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
m1 := make(map[resolver.Address]balancer.SubConn)
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m1[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
p1 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m1[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m1[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/c/d"},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
}
// A config update with no routes.
configJSON2 := `{}`
config2, err := rtParser.ParseConfig([]byte(configJSON2))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: config2,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect two remove subconn.
for range wantAddrs {
select {
case <-time.After(time.Millisecond * 500):
t.Fatalf("timeout waiting for remove subconn")
case <-cc.RemoveSubConnCh:
}
}
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, err := p2.Pick(balancer.PickInfo{})
if err != errNoMatchedRouteFound {
t.Fatalf("picker.Pick, got %v, %v, want error %v", gotSCSt, err, errNoMatchedRouteFound)
}
}
// Resend the previous config with routes and actions.
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
m2 := make(map[resolver.Address]balancer.SubConn)
// Verify that a subconn is created with the address, and the hierarchy path
// in the address is cleared.
for range wantAddrs {
addrs := <-cc.NewSubConnAddrsCh
if len(hierarchy.Get(addrs[0])) != 0 {
t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].Attributes)
}
sc := <-cc.NewSubConnCh
// Clear the attributes before adding to map.
addrs[0].Attributes = nil
m2[addrs[0]] = sc
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
}
p3 := <-cc.NewPickerCh
for _, tt := range []struct {
pickInfo balancer.PickInfo
wantSC balancer.SubConn
wantErr error
}{
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/0"},
wantSC: m2[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/a/1"},
wantSC: m2[wantAddrs[0]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/z/y"},
wantSC: m2[wantAddrs[1]],
wantErr: nil,
},
{
pickInfo: balancer.PickInfo{FullMethodName: "/c/d"},
wantSC: nil,
wantErr: errNoMatchedRouteFound,
},
} {
testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
}
}