mirror of https://github.com/grpc/grpc-go.git
cluster_resolver: add functions to build child balancer config (#4429)
This commit is contained in:
parent
3508452162
commit
e7b12ef3b1
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
// Package balancerconfig contains utility functions to build balancer config.
|
||||
// The built config will generate a tree of balancers with priority,
|
||||
// cluster_impl, weighted_target, lrs, and roundrobin.
|
||||
//
|
||||
// This is in a subpackage of cluster_resolver so that it can be used by the EDS
|
||||
// balancer. Eventually we will delete the EDS balancer, and replace it with
|
||||
// cluster_resolver, then we can move the functions to package cluster_resolver,
|
||||
// and unexport them.
|
||||
//
|
||||
// TODO: move and unexport. Read above.
|
||||
package balancerconfig
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/balancer/weightedroundrobin"
|
||||
"google.golang.org/grpc/internal/hierarchy"
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
"google.golang.org/grpc/xds/internal/balancer/priority"
|
||||
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const million = 1000000
|
||||
|
||||
// PriorityConfig is config for one priority. For example, if there an EDS and a
|
||||
// DNS, the priority list will be [priorityConfig{EDS}, PriorityConfig{DNS}].
|
||||
//
|
||||
// Each PriorityConfig corresponds to one discovery mechanism from the LBConfig
|
||||
// generated by the CDS balancer. The CDS balancer resolves the cluster name to
|
||||
// an ordered list of discovery mechanisms (if the top cluster is an aggregated
|
||||
// cluster), one for each underlying cluster.
|
||||
type PriorityConfig struct {
|
||||
Mechanism DiscoveryMechanism
|
||||
// EDSResp is set only if type is EDS.
|
||||
EDSResp xdsclient.EndpointsUpdate
|
||||
// Addresses is set only if type is DNS.
|
||||
Addresses []string
|
||||
}
|
||||
|
||||
// BuildPriorityConfigJSON builds balancer config for the passed in
|
||||
// priorities.
|
||||
//
|
||||
// The built tree of balancers (see test for the output struct).
|
||||
//
|
||||
// ┌────────┐
|
||||
// │priority│
|
||||
// └┬──────┬┘
|
||||
// │ │
|
||||
// ┌───────────▼┐ ┌▼───────────┐
|
||||
// │cluster_impl│ │cluster_impl│
|
||||
// └─┬──────────┘ └──────────┬─┘
|
||||
// │ │
|
||||
// ┌──────────────▼─┐ ┌─▼──────────────┐
|
||||
// │locality_picking│ │locality_picking│
|
||||
// └┬──────────────┬┘ └┬──────────────┬┘
|
||||
// │ │ │ │
|
||||
// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐
|
||||
// │LRS│ │LRS│ │LRS│ │LRS│
|
||||
// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘
|
||||
// │ │ │ │
|
||||
// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐
|
||||
// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│
|
||||
// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘
|
||||
//
|
||||
// If endpointPickingPolicy is nil, roundrobin will be used.
|
||||
//
|
||||
// Custom locality picking policy isn't support, and weighted_target is always
|
||||
// used.
|
||||
//
|
||||
// TODO: support setting locality picking policy, and add a parameter for
|
||||
// locality picking policy.
|
||||
func BuildPriorityConfigJSON(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
|
||||
pc, addrs := buildPriorityConfig(priorities, endpointPickingPolicy)
|
||||
ret, err := json.Marshal(pc)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
|
||||
}
|
||||
return ret, addrs, nil
|
||||
}
|
||||
|
||||
func buildPriorityConfig(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) {
|
||||
var (
|
||||
retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
|
||||
retAddrs []resolver.Address
|
||||
)
|
||||
for i, p := range priorities {
|
||||
switch p.Mechanism.Type {
|
||||
case DiscoveryMechanismTypeEDS:
|
||||
names, configs, addrs := buildClusterImplConfigForEDS(i, p.EDSResp, p.Mechanism, endpointPickingPolicy)
|
||||
retConfig.Priorities = append(retConfig.Priorities, names...)
|
||||
for n, c := range configs {
|
||||
retConfig.Children[n] = &priority.Child{
|
||||
Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c},
|
||||
// Ignore all re-resolution from EDS children.
|
||||
IgnoreReresolutionRequests: true,
|
||||
}
|
||||
}
|
||||
retAddrs = append(retAddrs, addrs...)
|
||||
case DiscoveryMechanismTypeLogicalDNS:
|
||||
name, config, addrs := buildClusterImplConfigForDNS(i, p.Addresses)
|
||||
retConfig.Priorities = append(retConfig.Priorities, name)
|
||||
retConfig.Children[name] = &priority.Child{
|
||||
Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
|
||||
// Not ignore re-resolution from DNS children, they will trigger
|
||||
// DNS to re-resolve.
|
||||
IgnoreReresolutionRequests: false,
|
||||
}
|
||||
retAddrs = append(retAddrs, addrs...)
|
||||
}
|
||||
}
|
||||
return retConfig, retAddrs
|
||||
}
|
||||
|
||||
func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) {
|
||||
// Endpoint picking policy for DNS is hardcoded to pick_first.
|
||||
const childPolicy = "pick_first"
|
||||
var retAddrs []resolver.Address
|
||||
pName := fmt.Sprintf("priority-%v", parentPriority)
|
||||
for _, addrStr := range addrStrs {
|
||||
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
|
||||
}
|
||||
return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs
|
||||
}
|
||||
|
||||
// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
|
||||
// each priority, sorted by priority, and the addresses for each priority (with
|
||||
// hierarchy attributes set).
|
||||
//
|
||||
// For example, if there are two priorities, the returned values will be
|
||||
// - ["p0", "p1"]
|
||||
// - map{"p0":p0_config, "p1":p1_config}
|
||||
// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
|
||||
// - p0 addresses' hierarchy attributes are set to p0
|
||||
func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.EndpointsUpdate, mechanism DiscoveryMechanism, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address) {
|
||||
var (
|
||||
retNames []string
|
||||
retAddrs []resolver.Address
|
||||
retConfigs = make(map[string]*clusterimpl.LBConfig)
|
||||
)
|
||||
|
||||
if endpointPickingPolicy == nil {
|
||||
endpointPickingPolicy = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}
|
||||
}
|
||||
|
||||
drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
|
||||
for _, d := range edsResp.Drops {
|
||||
drops = append(drops, clusterimpl.DropConfig{
|
||||
Category: d.Category,
|
||||
RequestsPerMillion: d.Numerator * million / d.Denominator,
|
||||
})
|
||||
}
|
||||
|
||||
priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities)
|
||||
for _, priorityName := range priorityChildNames {
|
||||
priorityLocalities := priorities[priorityName]
|
||||
// Prepend parent priority to the priority names, to avoid duplicates.
|
||||
pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName)
|
||||
retNames = append(retNames, pName)
|
||||
wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.LoadReportingServerName, mechanism.Cluster, mechanism.EDSServiceName)
|
||||
retConfigs[pName] = &clusterimpl.LBConfig{
|
||||
Cluster: mechanism.Cluster,
|
||||
EDSServiceName: mechanism.EDSServiceName,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig},
|
||||
LoadReportingServerName: mechanism.LoadReportingServerName,
|
||||
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
|
||||
DropCategories: drops,
|
||||
}
|
||||
retAddrs = append(retAddrs, addrs...)
|
||||
}
|
||||
|
||||
return retNames, retConfigs, retAddrs
|
||||
}
|
||||
|
||||
// groupLocalitiesByPriority returns the localities grouped by priority.
|
||||
//
|
||||
// It also returns a list of strings where each string represents a priority,
|
||||
// and the list is sorted from higher priority to lower priority.
|
||||
//
|
||||
// For example, for L0-p0, L1-p0, L2-p1, results will be
|
||||
// - ["p0", "p1"]
|
||||
// - map{"p0":[L0, L1], "p1":[L2]}
|
||||
func groupLocalitiesByPriority(localities []xdsclient.Locality) ([]string, map[string][]xdsclient.Locality) {
|
||||
var priorityIntSlice []int
|
||||
priorities := make(map[string][]xdsclient.Locality)
|
||||
for _, locality := range localities {
|
||||
if locality.Weight == 0 {
|
||||
continue
|
||||
}
|
||||
priorityName := fmt.Sprintf("%v", locality.Priority)
|
||||
priorities[priorityName] = append(priorities[priorityName], locality)
|
||||
priorityIntSlice = append(priorityIntSlice, int(locality.Priority))
|
||||
}
|
||||
// Sort the priorities based on the int value, deduplicate, and then turn
|
||||
// the sorted list into a string list. This will be child names, in priority
|
||||
// order.
|
||||
sort.Ints(priorityIntSlice)
|
||||
priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
|
||||
priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped))
|
||||
for _, p := range priorityIntSliceDeduped {
|
||||
priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p))
|
||||
}
|
||||
return priorityNameSlice, priorities
|
||||
}
|
||||
|
||||
func dedupSortedIntSlice(a []int) []int {
|
||||
if len(a) == 0 {
|
||||
return a
|
||||
}
|
||||
i, j := 0, 1
|
||||
for ; j < len(a); j++ {
|
||||
if a[i] == a[j] {
|
||||
continue
|
||||
}
|
||||
i++
|
||||
if i != j {
|
||||
a[i] = a[j]
|
||||
}
|
||||
}
|
||||
return a[:i+1]
|
||||
}
|
||||
|
||||
// localitiesToWeightedTarget takes a list of localities (with the same
|
||||
// priority), and generates a weighted target config, and list of addresses.
|
||||
//
|
||||
// The addresses have path hierarchy set to [priority-name, locality-name], so
|
||||
// priority and weighted target know which child policy they are for.
|
||||
func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, lrsServer *string, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) {
|
||||
weightedTargets := make(map[string]weightedtarget.Target)
|
||||
var addrs []resolver.Address
|
||||
for _, locality := range localities {
|
||||
localityStr, err := locality.ID.ToString()
|
||||
if err != nil {
|
||||
localityStr = fmt.Sprintf("%+v", locality.ID)
|
||||
}
|
||||
|
||||
child := childPolicy
|
||||
// If lrsServer is not set, we can skip this extra layer of the LRS
|
||||
// policy.
|
||||
if lrsServer != nil {
|
||||
localityID := locality.ID
|
||||
child = &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: cluster,
|
||||
EDSServiceName: edsService,
|
||||
ChildPolicy: childPolicy,
|
||||
LoadReportingServerName: *lrsServer,
|
||||
Locality: &localityID,
|
||||
},
|
||||
}
|
||||
}
|
||||
weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: child}
|
||||
|
||||
for _, endpoint := range locality.Endpoints {
|
||||
// Filter out all "unhealthy" endpoints (unknown and healthy are
|
||||
// both considered to be healthy:
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
|
||||
if endpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown {
|
||||
continue
|
||||
}
|
||||
|
||||
addr := resolver.Address{Addr: endpoint.Address}
|
||||
if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 {
|
||||
ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight}
|
||||
addr = weightedroundrobin.SetAddrInfo(addr, ai)
|
||||
}
|
||||
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
}
|
||||
return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs
|
||||
}
|
||||
|
|
@ -0,0 +1,839 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package balancerconfig
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/attributes"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/balancer/weightedroundrobin"
|
||||
"google.golang.org/grpc/internal/hierarchy"
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
|
||||
"google.golang.org/grpc/xds/internal/balancer/lrs"
|
||||
"google.golang.org/grpc/xds/internal/balancer/priority"
|
||||
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const (
|
||||
testClusterName = "test-cluster-name"
|
||||
testLRSServer = "test-lrs-server"
|
||||
testMaxRequests = 314
|
||||
testEDSServcie = "test-eds-service-name"
|
||||
testEDSServiceName = "service-name-from-parent"
|
||||
testDropCategory = "test-drops"
|
||||
testDropOverMillion = 1
|
||||
|
||||
localityCount = 5
|
||||
addressPerLocality = 2
|
||||
)
|
||||
|
||||
var (
|
||||
testLocalityIDs []internal.LocalityID
|
||||
testAddressStrs [][]string
|
||||
testEndpoints [][]xdsclient.Endpoint
|
||||
|
||||
testLocalitiesP0, testLocalitiesP1 []xdsclient.Locality
|
||||
|
||||
addrCmpOpts = cmp.Options{
|
||||
cmp.AllowUnexported(attributes.Attributes{}),
|
||||
cmp.Transformer("SortAddrs", func(in []resolver.Address) []resolver.Address {
|
||||
out := append([]resolver.Address(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].Addr < out[j].Addr
|
||||
})
|
||||
return out
|
||||
})}
|
||||
)
|
||||
|
||||
func init() {
|
||||
for i := 0; i < localityCount; i++ {
|
||||
testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)})
|
||||
var (
|
||||
addrs []string
|
||||
ends []xdsclient.Endpoint
|
||||
)
|
||||
for j := 0; j < addressPerLocality; j++ {
|
||||
addr := fmt.Sprintf("addr-%d-%d", i, j)
|
||||
addrs = append(addrs, addr)
|
||||
ends = append(ends, xdsclient.Endpoint{
|
||||
Address: addr,
|
||||
HealthStatus: xdsclient.EndpointHealthStatusHealthy,
|
||||
})
|
||||
}
|
||||
testAddressStrs = append(testAddressStrs, addrs)
|
||||
testEndpoints = append(testEndpoints, ends)
|
||||
}
|
||||
|
||||
testLocalitiesP0 = []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: testEndpoints[0],
|
||||
ID: testLocalityIDs[0],
|
||||
Weight: 20,
|
||||
Priority: 0,
|
||||
},
|
||||
{
|
||||
Endpoints: testEndpoints[1],
|
||||
ID: testLocalityIDs[1],
|
||||
Weight: 80,
|
||||
Priority: 0,
|
||||
},
|
||||
}
|
||||
testLocalitiesP1 = []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: testEndpoints[2],
|
||||
ID: testLocalityIDs[2],
|
||||
Weight: 20,
|
||||
Priority: 1,
|
||||
},
|
||||
{
|
||||
Endpoints: testEndpoints[3],
|
||||
ID: testLocalityIDs[3],
|
||||
Weight: 80,
|
||||
Priority: 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildPriorityConfigJSON is a sanity check that the built balancer config
|
||||
// can be parsed. The behavior test is covered by TestBuildPriorityConfig.
|
||||
func TestBuildPriorityConfigJSON(t *testing.T) {
|
||||
gotConfig, _, err := BuildPriorityConfigJSON([]PriorityConfig{
|
||||
{
|
||||
Mechanism: DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
EDSResp: xdsclient.EndpointsUpdate{
|
||||
Drops: []xdsclient.OverloadDropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
Numerator: testDropOverMillion,
|
||||
Denominator: million,
|
||||
},
|
||||
},
|
||||
Localities: []xdsclient.Locality{
|
||||
testLocalitiesP0[0],
|
||||
testLocalitiesP0[1],
|
||||
testLocalitiesP1[0],
|
||||
testLocalitiesP1[1],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
},
|
||||
Addresses: testAddressStrs[4],
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err)
|
||||
}
|
||||
|
||||
var prettyGot bytes.Buffer
|
||||
if err := json.Indent(&prettyGot, gotConfig, ">>> ", " "); err != nil {
|
||||
t.Fatalf("json.Indent() failed: %v", err)
|
||||
}
|
||||
// Print the indented json if this test fails.
|
||||
t.Log(prettyGot.String())
|
||||
|
||||
priorityB := balancer.Get(priority.Name)
|
||||
if _, err = priorityB.(balancer.ConfigParser).ParseConfig(gotConfig); err != nil {
|
||||
t.Fatalf("ParseConfig(%+v) failed: %v", gotConfig, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildPriorityConfig(t *testing.T) {
|
||||
gotConfig, gotAddrs := buildPriorityConfig([]PriorityConfig{
|
||||
{
|
||||
Mechanism: DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
EDSResp: xdsclient.EndpointsUpdate{
|
||||
Drops: []xdsclient.OverloadDropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
Numerator: testDropOverMillion,
|
||||
Denominator: million,
|
||||
},
|
||||
},
|
||||
Localities: []xdsclient.Locality{
|
||||
testLocalitiesP0[0],
|
||||
testLocalitiesP0[1],
|
||||
testLocalitiesP1[0],
|
||||
testLocalitiesP1[1],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
},
|
||||
Addresses: testAddressStrs[4],
|
||||
},
|
||||
}, nil)
|
||||
|
||||
wantConfig := &priority.LBConfig{
|
||||
Children: map[string]*priority.Child{
|
||||
"priority-0-0": {
|
||||
Config: &internalserviceconfig.BalancerConfig{
|
||||
Name: clusterimpl.Name,
|
||||
Config: &clusterimpl.LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
RequestsPerMillion: testDropOverMillion,
|
||||
},
|
||||
},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedtarget.Name,
|
||||
Config: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(testLocalityIDs[0].ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[0],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
assertString(testLocalityIDs[1].ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[1],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
IgnoreReresolutionRequests: true,
|
||||
},
|
||||
"priority-0-1": {
|
||||
Config: &internalserviceconfig.BalancerConfig{
|
||||
Name: clusterimpl.Name,
|
||||
Config: &clusterimpl.LBConfig{
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
RequestsPerMillion: testDropOverMillion,
|
||||
},
|
||||
},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedtarget.Name,
|
||||
Config: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(testLocalityIDs[2].ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[2],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
assertString(testLocalityIDs[3].ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[3],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
IgnoreReresolutionRequests: true,
|
||||
},
|
||||
"priority-1": {
|
||||
Config: &internalserviceconfig.BalancerConfig{
|
||||
Name: clusterimpl.Name,
|
||||
Config: &clusterimpl.LBConfig{
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: "pick_first"},
|
||||
},
|
||||
},
|
||||
IgnoreReresolutionRequests: false,
|
||||
},
|
||||
},
|
||||
Priorities: []string{"priority-0-0", "priority-0-1", "priority-1"},
|
||||
}
|
||||
wantAddrs := []resolver.Address{
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-0-0", assertString(testLocalityIDs[0].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-0-0", assertString(testLocalityIDs[1].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-0-1", assertString(testLocalityIDs[2].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-0-1", assertString(testLocalityIDs[3].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][0]}, []string{"priority-1"}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[4][1]}, []string{"priority-1"}),
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(gotConfig, wantConfig); diff != "" {
|
||||
t.Errorf("buildPriorityConfig() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" {
|
||||
t.Errorf("buildPriorityConfig() diff (-got +want) %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildClusterImplConfigForDNS(t *testing.T) {
|
||||
gotName, gotConfig, gotAddrs := buildClusterImplConfigForDNS(3, testAddressStrs[0])
|
||||
wantName := "priority-3"
|
||||
wantConfig := &clusterimpl.LBConfig{
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: "pick_first",
|
||||
},
|
||||
}
|
||||
wantAddrs := []resolver.Address{
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-3"}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-3"}),
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(gotName, wantName); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotConfig, wantConfig); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForDNS() diff (-got +want) %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildClusterImplConfigForEDS(t *testing.T) {
|
||||
gotNames, gotConfigs, gotAddrs := buildClusterImplConfigForEDS(
|
||||
2,
|
||||
xdsclient.EndpointsUpdate{
|
||||
Drops: []xdsclient.OverloadDropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
Numerator: testDropOverMillion,
|
||||
Denominator: million,
|
||||
},
|
||||
},
|
||||
Localities: []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: testEndpoints[3],
|
||||
ID: testLocalityIDs[3],
|
||||
Weight: 80,
|
||||
Priority: 1,
|
||||
}, {
|
||||
Endpoints: testEndpoints[1],
|
||||
ID: testLocalityIDs[1],
|
||||
Weight: 80,
|
||||
Priority: 0,
|
||||
}, {
|
||||
Endpoints: testEndpoints[2],
|
||||
ID: testLocalityIDs[2],
|
||||
Weight: 20,
|
||||
Priority: 1,
|
||||
}, {
|
||||
Endpoints: testEndpoints[0],
|
||||
ID: testLocalityIDs[0],
|
||||
Weight: 20,
|
||||
Priority: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
DiscoveryMechanism{
|
||||
Cluster: testClusterName,
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
wantNames := []string{
|
||||
fmt.Sprintf("priority-%v-%v", 2, 0),
|
||||
fmt.Sprintf("priority-%v-%v", 2, 1),
|
||||
}
|
||||
wantConfigs := map[string]*clusterimpl.LBConfig{
|
||||
"priority-2-0": {
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
RequestsPerMillion: testDropOverMillion,
|
||||
},
|
||||
},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedtarget.Name,
|
||||
Config: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(testLocalityIDs[0].ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[0],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
assertString(testLocalityIDs[1].ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[1],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"priority-2-1": {
|
||||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: newString(testLRSServer),
|
||||
MaxConcurrentRequests: newUint32(testMaxRequests),
|
||||
DropCategories: []clusterimpl.DropConfig{
|
||||
{
|
||||
Category: testDropCategory,
|
||||
RequestsPerMillion: testDropOverMillion,
|
||||
},
|
||||
},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedtarget.Name,
|
||||
Config: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(testLocalityIDs[2].ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[2],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
assertString(testLocalityIDs[3].ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
LoadReportingServerName: testLRSServer,
|
||||
Locality: &testLocalityIDs[3],
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
wantAddrs := []resolver.Address{
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][0]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[0][1]}, []string{"priority-2-0", assertString(testLocalityIDs[0].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][0]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[1][1]}, []string{"priority-2-0", assertString(testLocalityIDs[1].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][0]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[2][1]}, []string{"priority-2-1", assertString(testLocalityIDs[2].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][0]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: testAddressStrs[3][1]}, []string{"priority-2-1", assertString(testLocalityIDs[3].ToString)}),
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(gotNames, wantNames); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotConfigs, wantConfigs); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotAddrs, wantAddrs, addrCmpOpts); diff != "" {
|
||||
t.Errorf("buildClusterImplConfigForEDS() diff (-got +want) %v", diff)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestGroupLocalitiesByPriority(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
localities []xdsclient.Locality
|
||||
wantPriorities []string
|
||||
wantLocalities map[string][]xdsclient.Locality
|
||||
}{
|
||||
{
|
||||
name: "1 locality 1 priority",
|
||||
localities: []xdsclient.Locality{testLocalitiesP0[0]},
|
||||
wantPriorities: []string{"0"},
|
||||
wantLocalities: map[string][]xdsclient.Locality{
|
||||
"0": {testLocalitiesP0[0]},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 locality 1 priority",
|
||||
localities: []xdsclient.Locality{testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
wantPriorities: []string{"0"},
|
||||
wantLocalities: map[string][]xdsclient.Locality{
|
||||
"0": {testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "1 locality in each",
|
||||
localities: []xdsclient.Locality{testLocalitiesP0[0], testLocalitiesP1[0]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsclient.Locality{
|
||||
"0": {testLocalitiesP0[0]},
|
||||
"1": {testLocalitiesP1[0]},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 localities in each sorted",
|
||||
localities: []xdsclient.Locality{
|
||||
testLocalitiesP0[0], testLocalitiesP0[1],
|
||||
testLocalitiesP1[0], testLocalitiesP1[1]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsclient.Locality{
|
||||
"0": {testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
"1": {testLocalitiesP1[0], testLocalitiesP1[1]},
|
||||
},
|
||||
},
|
||||
{
|
||||
// The localities are given in order [p1, p0, p1, p0], but the
|
||||
// returned priority list must be sorted [p0, p1], because the list
|
||||
// order is the priority order.
|
||||
name: "2 localities in each needs to sort",
|
||||
localities: []xdsclient.Locality{
|
||||
testLocalitiesP1[1], testLocalitiesP0[1],
|
||||
testLocalitiesP1[0], testLocalitiesP0[0]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsclient.Locality{
|
||||
"0": {testLocalitiesP0[1], testLocalitiesP0[0]},
|
||||
"1": {testLocalitiesP1[1], testLocalitiesP1[0]},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotPriorities, gotLocalities := groupLocalitiesByPriority(tt.localities)
|
||||
if diff := cmp.Diff(gotPriorities, tt.wantPriorities); diff != "" {
|
||||
t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotLocalities, tt.wantLocalities); diff != "" {
|
||||
t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupSortedIntSlice(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
a []int
|
||||
want []int
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
a: []int{},
|
||||
want: []int{},
|
||||
},
|
||||
{
|
||||
name: "no dup",
|
||||
a: []int{0, 1, 2, 3},
|
||||
want: []int{0, 1, 2, 3},
|
||||
},
|
||||
{
|
||||
name: "with dup",
|
||||
a: []int{0, 0, 1, 1, 1, 2, 3},
|
||||
want: []int{0, 1, 2, 3},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := dedupSortedIntSlice(tt.a); !cmp.Equal(got, tt.want) {
|
||||
t.Errorf("dedupSortedIntSlice() = %v, want %v, diff %v", got, tt.want, cmp.Diff(got, tt.want))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalitiesToWeightedTarget(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
localities []xdsclient.Locality
|
||||
priorityName string
|
||||
childPolicy *internalserviceconfig.BalancerConfig
|
||||
lrsServer *string
|
||||
cluster string
|
||||
edsService string
|
||||
wantConfig *weightedtarget.LBConfig
|
||||
wantAddrs []resolver.Address
|
||||
}{
|
||||
{
|
||||
name: "roundrobin as child, with LRS",
|
||||
localities: []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
{Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-1"},
|
||||
Weight: 20,
|
||||
},
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
{Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-2"},
|
||||
Weight: 80,
|
||||
},
|
||||
},
|
||||
priorityName: "test-priority",
|
||||
childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
lrsServer: newString("test-lrs-server"),
|
||||
cluster: "test-cluster",
|
||||
edsService: "test-eds-service",
|
||||
wantConfig: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: "test-cluster",
|
||||
EDSServiceName: "test-eds-service",
|
||||
LoadReportingServerName: "test-lrs-server",
|
||||
Locality: &internal.LocalityID{Zone: "test-zone-1"},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: lrs.Name,
|
||||
Config: &lrs.LBConfig{
|
||||
ClusterName: "test-cluster",
|
||||
EDSServiceName: "test-eds-service",
|
||||
LoadReportingServerName: "test-lrs-server",
|
||||
Locality: &internal.LocalityID{Zone: "test-zone-2"},
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAddrs: []resolver.Address{
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "roundrobin as child, no LRS",
|
||||
localities: []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
{Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-1"},
|
||||
Weight: 20,
|
||||
},
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
{Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-2"},
|
||||
Weight: 80,
|
||||
},
|
||||
},
|
||||
priorityName: "test-priority",
|
||||
childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
|
||||
// lrsServer is nil, so LRS policy will not be used.
|
||||
wantConfig: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
},
|
||||
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: roundrobin.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAddrs: []resolver.Address{
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-1-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-1-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-2-1"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
hierarchy.Set(resolver.Address{Addr: "addr-2-2"}, []string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "weighted round robin as child, no LRS",
|
||||
localities: []xdsclient.Locality{
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-1-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 90},
|
||||
{Address: "addr-1-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 10},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-1"},
|
||||
Weight: 20,
|
||||
},
|
||||
{
|
||||
Endpoints: []xdsclient.Endpoint{
|
||||
{Address: "addr-2-1", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 90},
|
||||
{Address: "addr-2-2", HealthStatus: xdsclient.EndpointHealthStatusHealthy, Weight: 10},
|
||||
},
|
||||
ID: internal.LocalityID{Zone: "test-zone-2"},
|
||||
Weight: 80,
|
||||
},
|
||||
},
|
||||
priorityName: "test-priority",
|
||||
childPolicy: &internalserviceconfig.BalancerConfig{Name: weightedroundrobin.Name},
|
||||
// lrsServer is nil, so LRS policy will not be used.
|
||||
wantConfig: &weightedtarget.LBConfig{
|
||||
Targets: map[string]weightedtarget.Target{
|
||||
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
|
||||
Weight: 20,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedroundrobin.Name,
|
||||
},
|
||||
},
|
||||
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
|
||||
Weight: 80,
|
||||
ChildPolicy: &internalserviceconfig.BalancerConfig{
|
||||
Name: weightedroundrobin.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantAddrs: []resolver.Address{
|
||||
hierarchy.Set(
|
||||
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-1"}, weightedroundrobin.AddrInfo{Weight: 90}),
|
||||
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(
|
||||
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-1-2"}, weightedroundrobin.AddrInfo{Weight: 10}),
|
||||
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString)}),
|
||||
hierarchy.Set(
|
||||
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-1"}, weightedroundrobin.AddrInfo{Weight: 90}),
|
||||
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
hierarchy.Set(
|
||||
weightedroundrobin.SetAddrInfo(resolver.Address{Addr: "addr-2-2"}, weightedroundrobin.AddrInfo{Weight: 10}),
|
||||
[]string{"test-priority", assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString)}),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy, tt.lrsServer, tt.cluster, tt.edsService)
|
||||
if diff := cmp.Diff(got, tt.wantConfig); diff != "" {
|
||||
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(got1, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
|
||||
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newString(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func newUint32(i uint32) *uint32 {
|
||||
return &i
|
||||
}
|
||||
|
||||
func assertString(f func() (string, error)) string {
|
||||
s, err := f()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package balancerconfig
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// DiscoveryMechanismType is the type of discovery mechanism.
|
||||
type DiscoveryMechanismType int
|
||||
|
||||
const (
|
||||
// DiscoveryMechanismTypeEDS is eds.
|
||||
DiscoveryMechanismTypeEDS DiscoveryMechanismType = iota // `json:EDS`
|
||||
// DiscoveryMechanismTypeLogicalDNS is DNS.
|
||||
DiscoveryMechanismTypeLogicalDNS // `json:LOGICAL_DNS`
|
||||
)
|
||||
|
||||
// MarshalJSON marshals a DiscoveryMechanismType to a quoted json string.
|
||||
//
|
||||
// This is necessary to handle enum (as strings) from JSON.
|
||||
func (t *DiscoveryMechanismType) MarshalJSON() ([]byte, error) {
|
||||
buffer := bytes.NewBufferString(`"`)
|
||||
switch *t {
|
||||
case DiscoveryMechanismTypeEDS:
|
||||
buffer.WriteString("EDS")
|
||||
case DiscoveryMechanismTypeLogicalDNS:
|
||||
buffer.WriteString("LOGICAL_DNS")
|
||||
}
|
||||
buffer.WriteString(`"`)
|
||||
return buffer.Bytes(), nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON unmarshals a quoted json string to the DiscoveryMechanismType.
|
||||
func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error {
|
||||
var s string
|
||||
err := json.Unmarshal(b, &s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch s {
|
||||
case "EDS":
|
||||
*t = DiscoveryMechanismTypeEDS
|
||||
case "LOGICAL_DNS":
|
||||
*t = DiscoveryMechanismTypeLogicalDNS
|
||||
default:
|
||||
return fmt.Errorf("unable to unmarshal string %q to type DiscoveryMechanismType", s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DiscoveryMechanism is the discovery mechanism, can be either EDS or DNS.
|
||||
//
|
||||
// For DNS, the ClientConn target will be used for name resolution.
|
||||
//
|
||||
// For EDS, if EDSServiceName is not empty, it will be used for watching. If
|
||||
// EDSServiceName is empty, Cluster will be used.
|
||||
type DiscoveryMechanism struct {
|
||||
// Cluster is the cluster name.
|
||||
Cluster string `json:"cluster,omitempty"`
|
||||
// LoadReportingServerName is the LRS server to send load reports to. If
|
||||
// not present, load reporting will be disabled. If set to the empty string,
|
||||
// load reporting will be sent to the same server that we obtained CDS data
|
||||
// from.
|
||||
LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"`
|
||||
// MaxConcurrentRequests is the maximum number of outstanding requests can
|
||||
// be made to the upstream cluster. Default is 1024.
|
||||
MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"`
|
||||
// Type is the discovery mechanism type.
|
||||
Type DiscoveryMechanismType `json:"type,omitempty"`
|
||||
// EDSServiceName is the EDS service name, as returned in CDS. May be unset
|
||||
// if not specified in CDS. For type EDS only.
|
||||
//
|
||||
// This is used for EDS watch if set. If unset, Cluster is used for EDS
|
||||
// watch.
|
||||
EDSServiceName string `json:"edsServiceName,omitempty"`
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const million = 1000000
|
||||
|
||||
func buildPriorityConfigJSON(edsResp xdsclient.EndpointsUpdate, c *EDSConfig) ([]byte, []resolver.Address, error) {
|
||||
var childConfig *internalserviceconfig.BalancerConfig
|
||||
if c.ChildPolicy != nil {
|
||||
childConfig = &internalserviceconfig.BalancerConfig{Name: c.ChildPolicy.Name}
|
||||
}
|
||||
return balancerconfig.BuildPriorityConfigJSON(
|
||||
[]balancerconfig.PriorityConfig{
|
||||
{
|
||||
Mechanism: balancerconfig.DiscoveryMechanism{
|
||||
Cluster: c.ClusterName,
|
||||
LoadReportingServerName: c.LrsLoadReportingServerName,
|
||||
MaxConcurrentRequests: c.MaxConcurrentRequests,
|
||||
Type: balancerconfig.DiscoveryMechanismTypeEDS,
|
||||
EDSServiceName: c.EDSServiceName,
|
||||
},
|
||||
EDSResp: edsResp,
|
||||
},
|
||||
}, childConfig,
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package edsbalancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/balancer/priority"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/clustermanager" // Register the xds_cluster_manager balancer
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget" // Register the weighted_target balancer
|
||||
)
|
||||
|
||||
const (
|
||||
localityCount = 4
|
||||
addressPerLocality = 2
|
||||
)
|
||||
|
||||
var (
|
||||
testLocalityIDs []internal.LocalityID
|
||||
testEndpoints [][]xdsclient.Endpoint
|
||||
)
|
||||
|
||||
func init() {
|
||||
for i := 0; i < localityCount; i++ {
|
||||
testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)})
|
||||
var ends []xdsclient.Endpoint
|
||||
for j := 0; j < addressPerLocality; j++ {
|
||||
addr := fmt.Sprintf("addr-%d-%d", i, j)
|
||||
ends = append(ends, xdsclient.Endpoint{
|
||||
Address: addr,
|
||||
HealthStatus: xdsclient.EndpointHealthStatusHealthy,
|
||||
})
|
||||
}
|
||||
testEndpoints = append(testEndpoints, ends)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBuildPriorityConfigJSON is a sanity check that the generated config bytes
|
||||
// are valid (can be parsed back to a config struct).
|
||||
//
|
||||
// The correctness is covered by the unmarshalled version
|
||||
// TestBuildPriorityConfig.
|
||||
func TestBuildPriorityConfigJSON(t *testing.T) {
|
||||
const (
|
||||
testClusterName = "cluster-name-for-watch"
|
||||
testEDSServiceName = "service-name-from-parent"
|
||||
testLRSServer = "lrs-addr-from-config"
|
||||
testMaxReq = 314
|
||||
testDropCategory = "test-drops"
|
||||
testDropOverMillion = 1
|
||||
)
|
||||
for _, lrsServer := range []*string{newString(testLRSServer), newString(""), nil} {
|
||||
got, _, err := buildPriorityConfigJSON(xdsclient.EndpointsUpdate{
|
||||
Drops: []xdsclient.OverloadDropConfig{{
|
||||
Category: testDropCategory,
|
||||
Numerator: testDropOverMillion,
|
||||
Denominator: million,
|
||||
}},
|
||||
Localities: []xdsclient.Locality{{
|
||||
Endpoints: testEndpoints[3],
|
||||
ID: testLocalityIDs[3],
|
||||
Weight: 80,
|
||||
Priority: 1,
|
||||
}, {
|
||||
Endpoints: testEndpoints[1],
|
||||
ID: testLocalityIDs[1],
|
||||
Weight: 80,
|
||||
Priority: 0,
|
||||
}, {
|
||||
Endpoints: testEndpoints[2],
|
||||
ID: testLocalityIDs[2],
|
||||
Weight: 20,
|
||||
Priority: 1,
|
||||
}, {
|
||||
Endpoints: testEndpoints[0],
|
||||
ID: testLocalityIDs[0],
|
||||
Weight: 20,
|
||||
Priority: 0,
|
||||
}}},
|
||||
&EDSConfig{
|
||||
ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name},
|
||||
ClusterName: testClusterName,
|
||||
EDSServiceName: testEDSServiceName,
|
||||
MaxConcurrentRequests: newUint32(testMaxReq),
|
||||
LrsLoadReportingServerName: lrsServer,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err)
|
||||
}
|
||||
priorityB := balancer.Get(priority.Name)
|
||||
if _, err = priorityB.(balancer.ConfigParser).ParseConfig(got); err != nil {
|
||||
t.Fatalf("ParseConfig(%+v) failed: %v", got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newString(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func newUint32(i uint32) *uint32 {
|
||||
return &i
|
||||
}
|
||||
Loading…
Reference in New Issue