mirror of https://github.com/grpc/grpc-go.git
xds/clusterresolver: reuse child policy names for the same locality (#5367)
This commit is contained in:
parent
6e253e8afa
commit
da6ef0055c
|
|
@ -51,6 +51,9 @@ type priorityConfig struct {
|
|||
edsResp xdsresource.EndpointsUpdate
|
||||
// addresses is set only if type is DNS.
|
||||
addresses []string
|
||||
// Each discovery mechanism has a name generator so that the child policies
|
||||
// can reuse names between updates (EDS updates for example).
|
||||
childNameGen *nameGenerator
|
||||
}
|
||||
|
||||
// buildPriorityConfigJSON builds balancer config for the passed in
|
||||
|
|
@ -118,10 +121,10 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
|
|||
retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
|
||||
retAddrs []resolver.Address
|
||||
)
|
||||
for i, p := range priorities {
|
||||
for _, p := range priorities {
|
||||
switch p.mechanism.Type {
|
||||
case DiscoveryMechanismTypeEDS:
|
||||
names, configs, addrs, err := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, xdsLBPolicy)
|
||||
names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
@ -135,7 +138,7 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
|
|||
}
|
||||
retAddrs = append(retAddrs, addrs...)
|
||||
case DiscoveryMechanismTypeLogicalDNS:
|
||||
name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses, p.mechanism)
|
||||
name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
|
||||
retConfig.Priorities = append(retConfig.Priorities, name)
|
||||
retConfig.Children[name] = &priority.Child{
|
||||
Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
|
||||
|
|
@ -149,11 +152,11 @@ func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalservi
|
|||
return retConfig, retAddrs, nil
|
||||
}
|
||||
|
||||
func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
|
||||
func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
|
||||
// Endpoint picking policy for DNS is hardcoded to pick_first.
|
||||
const childPolicy = "pick_first"
|
||||
retAddrs := make([]resolver.Address, 0, len(addrStrs))
|
||||
pName := fmt.Sprintf("priority-%v", parentPriority)
|
||||
pName := fmt.Sprintf("priority-%v", g.prefix)
|
||||
for _, addrStr := range addrStrs {
|
||||
retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
|
||||
}
|
||||
|
|
@ -172,7 +175,7 @@ func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string, mechani
|
|||
// - map{"p0":p0_config, "p1":p1_config}
|
||||
// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
|
||||
// - p0 addresses' hierarchy attributes are set to p0
|
||||
func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
|
||||
func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
|
||||
drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
|
||||
for _, d := range edsResp.Drops {
|
||||
drops = append(drops, clusterimpl.DropConfig{
|
||||
|
|
@ -181,15 +184,12 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.Endpoi
|
|||
})
|
||||
}
|
||||
|
||||
priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities)
|
||||
retNames := make([]string, 0, len(priorityChildNames))
|
||||
retAddrs := make([]resolver.Address, 0, len(priorityChildNames))
|
||||
retConfigs := make(map[string]*clusterimpl.LBConfig, len(priorityChildNames))
|
||||
for _, priorityName := range priorityChildNames {
|
||||
priorityLocalities := priorities[priorityName]
|
||||
// Prepend parent priority to the priority names, to avoid duplicates.
|
||||
pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName)
|
||||
retNames = append(retNames, pName)
|
||||
priorities := groupLocalitiesByPriority(edsResp.Localities)
|
||||
retNames := g.generate(priorities)
|
||||
retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
|
||||
var retAddrs []resolver.Address
|
||||
for i, pName := range retNames {
|
||||
priorityLocalities := priorities[i]
|
||||
cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
|
@ -202,33 +202,32 @@ func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.Endpoi
|
|||
|
||||
// groupLocalitiesByPriority returns the localities grouped by priority.
|
||||
//
|
||||
// It also returns a list of strings where each string represents a priority,
|
||||
// and the list is sorted from higher priority to lower priority.
|
||||
// The returned list is sorted from higher priority to lower. Each item in the
|
||||
// list is a group of localities.
|
||||
//
|
||||
// For example, for L0-p0, L1-p0, L2-p1, results will be
|
||||
// - ["p0", "p1"]
|
||||
// - map{"p0":[L0, L1], "p1":[L2]}
|
||||
func groupLocalitiesByPriority(localities []xdsresource.Locality) ([]string, map[string][]xdsresource.Locality) {
|
||||
// - [[L0, L1], [L2]]
|
||||
func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality {
|
||||
var priorityIntSlice []int
|
||||
priorities := make(map[string][]xdsresource.Locality)
|
||||
priorities := make(map[int][]xdsresource.Locality)
|
||||
for _, locality := range localities {
|
||||
if locality.Weight == 0 {
|
||||
continue
|
||||
}
|
||||
priorityName := fmt.Sprintf("%v", locality.Priority)
|
||||
priorities[priorityName] = append(priorities[priorityName], locality)
|
||||
priorityIntSlice = append(priorityIntSlice, int(locality.Priority))
|
||||
priority := int(locality.Priority)
|
||||
priorities[priority] = append(priorities[priority], locality)
|
||||
priorityIntSlice = append(priorityIntSlice, priority)
|
||||
}
|
||||
// Sort the priorities based on the int value, deduplicate, and then turn
|
||||
// the sorted list into a string list. This will be child names, in priority
|
||||
// order.
|
||||
sort.Ints(priorityIntSlice)
|
||||
priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
|
||||
priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped))
|
||||
ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped))
|
||||
for _, p := range priorityIntSliceDeduped {
|
||||
priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p))
|
||||
ret = append(ret, priorities[p])
|
||||
}
|
||||
return priorityNameSlice, priorities
|
||||
return ret
|
||||
}
|
||||
|
||||
func dedupSortedIntSlice(a []int) []int {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package clusterresolver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
// nameGenerator generates a child name for a list of priorities (each priority
|
||||
// is a list of localities).
|
||||
//
|
||||
// The purpose of this generator is to reuse names between updates. So the
|
||||
// struct keeps state between generate() calls, and a later generate() might
|
||||
// return names returned by the previous call.
|
||||
type nameGenerator struct {
|
||||
existingNames map[internal.LocalityID]string
|
||||
prefix uint64
|
||||
nextID uint64
|
||||
}
|
||||
|
||||
func newNameGenerator(prefix uint64) *nameGenerator {
|
||||
return &nameGenerator{prefix: prefix}
|
||||
}
|
||||
|
||||
// generate returns a list of names for the given list of priorities.
|
||||
//
|
||||
// Each priority is a list of localities. The name for the priority is picked as
|
||||
// - for each locality in this priority, if it exists in the existing names,
|
||||
// this priority will reuse the name
|
||||
// - if no reusable name is found for this priority, a new name is generated
|
||||
//
|
||||
// For example:
|
||||
// - update 1: [[L1], [L2], [L3]] --> ["0", "1", "2"]
|
||||
// - update 2: [[L1], [L2], [L3]] --> ["0", "1", "2"]
|
||||
// - update 3: [[L1, L2], [L3]] --> ["0", "2"] (Two priorities were merged)
|
||||
// - update 4: [[L1], [L4]] --> ["0", "3",] (A priority was split, and a new priority was added)
|
||||
func (ng *nameGenerator) generate(priorities [][]xdsresource.Locality) []string {
|
||||
var ret []string
|
||||
usedNames := make(map[string]bool)
|
||||
newNames := make(map[internal.LocalityID]string)
|
||||
for _, priority := range priorities {
|
||||
var nameFound string
|
||||
for _, locality := range priority {
|
||||
if name, ok := ng.existingNames[locality.ID]; ok {
|
||||
if !usedNames[name] {
|
||||
nameFound = name
|
||||
// Found a name to use. No need to process the remaining
|
||||
// localities.
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if nameFound == "" {
|
||||
// No appropriate used name is found. Make a new name.
|
||||
nameFound = fmt.Sprintf("priority-%d-%d", ng.prefix, ng.nextID)
|
||||
ng.nextID++
|
||||
}
|
||||
|
||||
ret = append(ret, nameFound)
|
||||
// All localities in this priority share the same name. Add them all to
|
||||
// the new map.
|
||||
for _, l := range priority {
|
||||
newNames[l.ID] = nameFound
|
||||
}
|
||||
usedNames[nameFound] = true
|
||||
}
|
||||
ng.existingNames = newNames
|
||||
return ret
|
||||
}
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package clusterresolver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
|
||||
)
|
||||
|
||||
func Test_nameGenerator_generate(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
prefix uint64
|
||||
input1 [][]xdsresource.Locality
|
||||
input2 [][]xdsresource.Locality
|
||||
want []string
|
||||
}{
|
||||
{
|
||||
name: "init, two new priorities",
|
||||
prefix: 3,
|
||||
input1: nil,
|
||||
input2: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}},
|
||||
},
|
||||
want: []string{"priority-3-0", "priority-3-1"},
|
||||
},
|
||||
{
|
||||
name: "one new priority",
|
||||
prefix: 1,
|
||||
input1: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
},
|
||||
input2: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}},
|
||||
},
|
||||
want: []string{"priority-1-0", "priority-1-1"},
|
||||
},
|
||||
{
|
||||
name: "merge two priorities",
|
||||
prefix: 4,
|
||||
input1: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
input2: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}, {ID: internal.LocalityID{Zone: "L1"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
want: []string{"priority-4-0", "priority-4-2"},
|
||||
},
|
||||
{
|
||||
name: "swap two priorities",
|
||||
input1: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
input2: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
want: []string{"priority-0-1", "priority-0-0", "priority-0-2"},
|
||||
},
|
||||
{
|
||||
name: "split priority",
|
||||
input1: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}, {ID: internal.LocalityID{Zone: "L1"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
input2: [][]xdsresource.Locality{
|
||||
{{ID: internal.LocalityID{Zone: "L0"}}},
|
||||
{{ID: internal.LocalityID{Zone: "L1"}}}, // This gets a newly generated name, sice "0-0" was already picked.
|
||||
{{ID: internal.LocalityID{Zone: "L2"}}},
|
||||
},
|
||||
want: []string{"priority-0-0", "priority-0-2", "priority-0-1"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ng := newNameGenerator(tt.prefix)
|
||||
got1 := ng.generate(tt.input1)
|
||||
t.Logf("%v", got1)
|
||||
got := ng.generate(tt.input2)
|
||||
if diff := cmp.Diff(got, tt.want); diff != "" {
|
||||
t.Errorf("generate() = got: %v, want: %v, diff (-got +want): %s", got, tt.want, diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -146,12 +146,14 @@ func TestBuildPriorityConfigJSON(t *testing.T) {
|
|||
testLocalitiesP1[1],
|
||||
},
|
||||
},
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
},
|
||||
addresses: testAddressStrs[4],
|
||||
addresses: testAddressStrs[4],
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
}, nil)
|
||||
if err != nil {
|
||||
|
|
@ -196,13 +198,15 @@ func TestBuildPriorityConfig(t *testing.T) {
|
|||
testLocalitiesP1[1],
|
||||
},
|
||||
},
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Cluster: testClusterName2,
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
},
|
||||
addresses: testAddressStrs[4],
|
||||
addresses: testAddressStrs[4],
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
}, nil)
|
||||
|
||||
|
|
@ -309,7 +313,7 @@ func TestBuildPriorityConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestBuildClusterImplConfigForDNS(t *testing.T) {
|
||||
gotName, gotConfig, gotAddrs := buildClusterImplConfigForDNS(3, testAddressStrs[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS})
|
||||
gotName, gotConfig, gotAddrs := buildClusterImplConfigForDNS(newNameGenerator(3), testAddressStrs[0], DiscoveryMechanism{Cluster: testClusterName2, Type: DiscoveryMechanismTypeLogicalDNS})
|
||||
wantName := "priority-3"
|
||||
wantConfig := &clusterimpl.LBConfig{
|
||||
Cluster: testClusterName2,
|
||||
|
|
@ -335,7 +339,7 @@ func TestBuildClusterImplConfigForDNS(t *testing.T) {
|
|||
|
||||
func TestBuildClusterImplConfigForEDS(t *testing.T) {
|
||||
gotNames, gotConfigs, gotAddrs, _ := buildClusterImplConfigForEDS(
|
||||
2,
|
||||
newNameGenerator(2),
|
||||
xdsresource.EndpointsUpdate{
|
||||
Drops: []xdsresource.OverloadDropConfig{
|
||||
{
|
||||
|
|
@ -465,32 +469,28 @@ func TestGroupLocalitiesByPriority(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
localities []xdsresource.Locality
|
||||
wantPriorities []string
|
||||
wantLocalities map[string][]xdsresource.Locality
|
||||
wantLocalities [][]xdsresource.Locality
|
||||
}{
|
||||
{
|
||||
name: "1 locality 1 priority",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0]},
|
||||
wantPriorities: []string{"0"},
|
||||
wantLocalities: map[string][]xdsresource.Locality{
|
||||
"0": {testLocalitiesP0[0]},
|
||||
name: "1 locality 1 priority",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0]},
|
||||
wantLocalities: [][]xdsresource.Locality{
|
||||
{testLocalitiesP0[0]},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "2 locality 1 priority",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
wantPriorities: []string{"0"},
|
||||
wantLocalities: map[string][]xdsresource.Locality{
|
||||
"0": {testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
name: "2 locality 1 priority",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
wantLocalities: [][]xdsresource.Locality{
|
||||
{testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "1 locality in each",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0], testLocalitiesP1[0]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsresource.Locality{
|
||||
"0": {testLocalitiesP0[0]},
|
||||
"1": {testLocalitiesP1[0]},
|
||||
name: "1 locality in each",
|
||||
localities: []xdsresource.Locality{testLocalitiesP0[0], testLocalitiesP1[0]},
|
||||
wantLocalities: [][]xdsresource.Locality{
|
||||
{testLocalitiesP0[0]},
|
||||
{testLocalitiesP1[0]},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -498,10 +498,9 @@ func TestGroupLocalitiesByPriority(t *testing.T) {
|
|||
localities: []xdsresource.Locality{
|
||||
testLocalitiesP0[0], testLocalitiesP0[1],
|
||||
testLocalitiesP1[0], testLocalitiesP1[1]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsresource.Locality{
|
||||
"0": {testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
"1": {testLocalitiesP1[0], testLocalitiesP1[1]},
|
||||
wantLocalities: [][]xdsresource.Locality{
|
||||
{testLocalitiesP0[0], testLocalitiesP0[1]},
|
||||
{testLocalitiesP1[0], testLocalitiesP1[1]},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -512,19 +511,15 @@ func TestGroupLocalitiesByPriority(t *testing.T) {
|
|||
localities: []xdsresource.Locality{
|
||||
testLocalitiesP1[1], testLocalitiesP0[1],
|
||||
testLocalitiesP1[0], testLocalitiesP0[0]},
|
||||
wantPriorities: []string{"0", "1"},
|
||||
wantLocalities: map[string][]xdsresource.Locality{
|
||||
"0": {testLocalitiesP0[1], testLocalitiesP0[0]},
|
||||
"1": {testLocalitiesP1[1], testLocalitiesP1[0]},
|
||||
wantLocalities: [][]xdsresource.Locality{
|
||||
{testLocalitiesP0[1], testLocalitiesP0[0]},
|
||||
{testLocalitiesP1[1], testLocalitiesP1[0]},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotPriorities, gotLocalities := groupLocalitiesByPriority(tt.localities)
|
||||
if diff := cmp.Diff(gotPriorities, tt.wantPriorities); diff != "" {
|
||||
t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff)
|
||||
}
|
||||
gotLocalities := groupLocalitiesByPriority(tt.localities)
|
||||
if diff := cmp.Diff(gotLocalities, tt.wantLocalities); diff != "" {
|
||||
t.Errorf("groupLocalitiesByPriority() diff(-got +want) %v", diff)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,6 +55,8 @@ type resolverMechanismTuple struct {
|
|||
dm DiscoveryMechanism
|
||||
dmKey discoveryMechanismKey
|
||||
r discoveryMechanism
|
||||
|
||||
childNameGen *nameGenerator
|
||||
}
|
||||
|
||||
type resourceResolver struct {
|
||||
|
|
@ -62,17 +64,28 @@ type resourceResolver struct {
|
|||
updateChannel chan *resourceUpdate
|
||||
|
||||
// mu protects the slice and map, and content of the resolvers in the slice.
|
||||
mu sync.Mutex
|
||||
mechanisms []DiscoveryMechanism
|
||||
children []resolverMechanismTuple
|
||||
childrenMap map[discoveryMechanismKey]discoveryMechanism
|
||||
mu sync.Mutex
|
||||
mechanisms []DiscoveryMechanism
|
||||
children []resolverMechanismTuple
|
||||
// childrenMap's value only needs the resolver implementation (type
|
||||
// discoveryMechanism) and the childNameGen. The other two fields are not
|
||||
// used.
|
||||
//
|
||||
// TODO(cleanup): maybe we can make a new type with just the necessary
|
||||
// fields, and use it here instead.
|
||||
childrenMap map[discoveryMechanismKey]resolverMechanismTuple
|
||||
// Each new discovery mechanism needs a child name generator to reuse child
|
||||
// policy names. But to make sure the names across discover mechanism
|
||||
// doesn't conflict, we need a seq ID. This ID is incremented for each new
|
||||
// discover mechanism.
|
||||
childNameGeneratorSeqID uint64
|
||||
}
|
||||
|
||||
func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver {
|
||||
return &resourceResolver{
|
||||
parent: parent,
|
||||
updateChannel: make(chan *resourceUpdate, 1),
|
||||
childrenMap: make(map[discoveryMechanismKey]discoveryMechanism),
|
||||
childrenMap: make(map[discoveryMechanismKey]resolverMechanismTuple),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -112,31 +125,54 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
|
|||
dmKey := discoveryMechanismKey{typ: dm.Type, name: nameToWatch}
|
||||
newDMs[dmKey] = true
|
||||
|
||||
r := rr.childrenMap[dmKey]
|
||||
if r == nil {
|
||||
r = newEDSResolver(nameToWatch, rr.parent.xdsClient, rr)
|
||||
r, ok := rr.childrenMap[dmKey]
|
||||
if !ok {
|
||||
r = resolverMechanismTuple{
|
||||
dm: dm,
|
||||
dmKey: dmKey,
|
||||
r: newEDSResolver(nameToWatch, rr.parent.xdsClient, rr),
|
||||
childNameGen: newNameGenerator(rr.childNameGeneratorSeqID),
|
||||
}
|
||||
rr.childrenMap[dmKey] = r
|
||||
rr.childNameGeneratorSeqID++
|
||||
} else {
|
||||
// If this is not new, keep the fields (especially
|
||||
// childNameGen), and only update the DiscoveryMechanism.
|
||||
//
|
||||
// Note that the same dmKey doesn't mean the same
|
||||
// DiscoveryMechanism. There are fields (e.g.
|
||||
// MaxConcurrentRequests) in DiscoveryMechanism that are not
|
||||
// copied to dmKey, we need to keep those updated.
|
||||
r.dm = dm
|
||||
}
|
||||
rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
|
||||
rr.children[i] = r
|
||||
case DiscoveryMechanismTypeLogicalDNS:
|
||||
// Name to resolve in DNS is the hostname, not the ClientConn
|
||||
// target.
|
||||
dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname}
|
||||
newDMs[dmKey] = true
|
||||
|
||||
r := rr.childrenMap[dmKey]
|
||||
if r == nil {
|
||||
r = newDNSResolver(dm.DNSHostname, rr)
|
||||
r, ok := rr.childrenMap[dmKey]
|
||||
if !ok {
|
||||
r = resolverMechanismTuple{
|
||||
dm: dm,
|
||||
dmKey: dmKey,
|
||||
r: newDNSResolver(dm.DNSHostname, rr),
|
||||
childNameGen: newNameGenerator(rr.childNameGeneratorSeqID),
|
||||
}
|
||||
rr.childrenMap[dmKey] = r
|
||||
rr.childNameGeneratorSeqID++
|
||||
} else {
|
||||
r.dm = dm
|
||||
}
|
||||
rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r}
|
||||
rr.children[i] = r
|
||||
}
|
||||
}
|
||||
// Stop the resources that were removed.
|
||||
for dm, r := range rr.childrenMap {
|
||||
if !newDMs[dm] {
|
||||
delete(rr.childrenMap, dm)
|
||||
r.stop()
|
||||
r.r.stop()
|
||||
}
|
||||
}
|
||||
// Regenerate even if there's no change in discovery mechanism, in case
|
||||
|
|
@ -150,7 +186,7 @@ func (rr *resourceResolver) resolveNow() {
|
|||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
for _, r := range rr.childrenMap {
|
||||
r.resolveNow()
|
||||
r.r.resolveNow()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -159,7 +195,7 @@ func (rr *resourceResolver) stop() {
|
|||
defer rr.mu.Unlock()
|
||||
for dm, r := range rr.childrenMap {
|
||||
delete(rr.childrenMap, dm)
|
||||
r.stop()
|
||||
r.r.stop()
|
||||
}
|
||||
rr.mechanisms = nil
|
||||
rr.children = nil
|
||||
|
|
@ -174,13 +210,7 @@ func (rr *resourceResolver) stop() {
|
|||
func (rr *resourceResolver) generate() {
|
||||
var ret []priorityConfig
|
||||
for _, rDM := range rr.children {
|
||||
r, ok := rr.childrenMap[rDM.dmKey]
|
||||
if !ok {
|
||||
rr.parent.logger.Infof("resolver for %+v not found, should never happen", rDM.dmKey)
|
||||
continue
|
||||
}
|
||||
|
||||
u, ok := r.lastUpdate()
|
||||
u, ok := rDM.r.lastUpdate()
|
||||
if !ok {
|
||||
// Don't send updates to parent until all resolvers have update to
|
||||
// send.
|
||||
|
|
@ -188,9 +218,9 @@ func (rr *resourceResolver) generate() {
|
|||
}
|
||||
switch uu := u.(type) {
|
||||
case xdsresource.EndpointsUpdate:
|
||||
ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu})
|
||||
ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen})
|
||||
case []string:
|
||||
ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu})
|
||||
ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen})
|
||||
}
|
||||
}
|
||||
select {
|
||||
|
|
|
|||
|
|
@ -68,7 +68,8 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
|
|||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
}},
|
||||
},
|
||||
{
|
||||
|
|
@ -81,7 +82,8 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterName,
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(0),
|
||||
}},
|
||||
},
|
||||
} {
|
||||
|
|
@ -107,7 +109,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
|
|||
fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil)
|
||||
select {
|
||||
case u := <-rr.updateChannel:
|
||||
if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -161,7 +163,8 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
DNSHostname: testDNSTarget,
|
||||
},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
childNameGen: newNameGenerator(0),
|
||||
}},
|
||||
},
|
||||
} {
|
||||
|
|
@ -189,7 +192,7 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) {
|
|||
dnsR.UpdateState(resolver.State{Addresses: test.addrs})
|
||||
select {
|
||||
case u := <-rr.updateChannel:
|
||||
if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -243,8 +246,9 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
|
|||
Cluster: testClusterName,
|
||||
EDSServiceName: testEDSServcie,
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -289,8 +293,9 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterName,
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(1),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -317,8 +322,9 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
|
|||
Cluster: testClusterName,
|
||||
MaxConcurrentRequests: newUint32(123),
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(1),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -388,7 +394,8 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterNames[0],
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
|
|
@ -396,9 +403,10 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) {
|
|||
Cluster: testClusterNames[1],
|
||||
MaxConcurrentRequests: newUint32(100),
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -500,16 +508,18 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterNames[0],
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterNames[1],
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -541,16 +551,18 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterNames[1],
|
||||
},
|
||||
edsResp: testEDSUpdates[1],
|
||||
edsResp: testEDSUpdates[1],
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterNames[0],
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -628,16 +640,18 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterName,
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
},
|
||||
{
|
||||
mechanism: DiscoveryMechanism{
|
||||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
DNSHostname: testDNSTarget,
|
||||
},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
childNameGen: newNameGenerator(1),
|
||||
},
|
||||
}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -689,8 +703,9 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeEDS,
|
||||
Cluster: testClusterName,
|
||||
},
|
||||
edsResp: testEDSUpdates[0],
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
edsResp: testEDSUpdates[0],
|
||||
childNameGen: newNameGenerator(0),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -726,8 +741,9 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
DNSHostname: testDNSTarget,
|
||||
},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
childNameGen: newNameGenerator(1),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
@ -847,8 +863,9 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) {
|
|||
Type: DiscoveryMechanismTypeLogicalDNS,
|
||||
DNSHostname: testDNSTarget,
|
||||
},
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
}}, cmp.AllowUnexported(priorityConfig{})); diff != "" {
|
||||
addresses: []string{"1.1.1.1", "2.2.2.2"},
|
||||
childNameGen: newNameGenerator(0),
|
||||
}}, cmp.AllowUnexported(priorityConfig{}, nameGenerator{})); diff != "" {
|
||||
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
|
|||
Loading…
Reference in New Issue