xds: Add support for Custom LB Policies (#6224)

This commit is contained in:
Zach Reyes 2023-05-08 21:29:36 -04:00 committed by GitHub
parent 5c4bee51c2
commit 5e587344ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 955 additions and 725 deletions

View File

@ -25,6 +25,11 @@
// later release.
package attributes
import (
"fmt"
"strings"
)
// Attributes is an immutable struct for storing and retrieving generic
// key/value pairs. Keys must be hashable, and users should define their own
// types for keys. Values should not be modified after they are added to an
@ -99,3 +104,27 @@ func (a *Attributes) Equal(o *Attributes) bool {
}
return true
}
// String prints the attribute map. If any key or values throughout the map
// implement fmt.Stringer, it calls that method and appends.
func (a *Attributes) String() string {
var sb strings.Builder
sb.WriteString("{")
first := true
for k, v := range a.m {
var key, val string
if str, ok := k.(interface{ String() string }); ok {
key = str.String()
}
if str, ok := v.(interface{ String() string }); ok {
val = str.String()
}
if !first {
sb.WriteString(", ")
}
sb.WriteString(fmt.Sprintf("%q: %q, ", key, val))
first = false
}
sb.WriteString("}")
return sb.String()
}

View File

@ -28,6 +28,8 @@
package weightedroundrobin
import (
"fmt"
"google.golang.org/grpc/resolver"
)
@ -61,3 +63,7 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
ai, _ := v.(AddrInfo)
return ai
}
func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}

View File

@ -178,6 +178,14 @@ func (wbsa *Aggregator) ResumeStateUpdates() {
}
}
// NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
// picker update be sent once ResumeStateUpdates is called.
func (wbsa *Aggregator) NeedUpdateStateOnResume() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.needUpdateStateOnResume = true
}
// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//

View File

@ -143,6 +143,18 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat
b.targets = newConfig.Targets
// If the targets length is zero, it means we have removed all child
// policies from the balancer group and aggregator.
// At the start of this UpdateClientConnState() operation, a call to
// b.stateAggregator.ResumeStateUpdates() is deferred. Thus, setting the
// needUpdateStateOnResume bool to true here will ensure a new picker is
// built as part of that deferred function. Since there are now no child
// policies, the aggregated connectivity state reported form the Aggregator
// will be TRANSIENT_FAILURE.
if len(b.targets) == 0 {
b.stateAggregator.NeedUpdateStateOnResume()
}
return nil
}

View File

@ -166,7 +166,8 @@ func init() {
// TestWeightedTarget covers the cases that a sub-balancer is added and a
// sub-balancer is removed. It verifies that the addresses and balancer configs
// are forwarded to the right sub-balancer. This test is intended to test the
// glue code in weighted_target.
// glue code in weighted_target. It also tests an empty target config update,
// which should trigger a transient failure state update.
func (s) TestWeightedTarget(t *testing.T) {
cc := testutils.NewTestClientConn(t)
wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
@ -306,6 +307,24 @@ func (s) TestWeightedTarget(t *testing.T) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
}
}
// Update the Weighted Target Balancer with an empty address list and no
// targets. This should cause a Transient Failure State update to the Client
// Conn.
emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`))
if err != nil {
t.Fatalf("Failed to parse balancer config: %v", err)
}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{},
BalancerConfig: emptyConfig,
}); err != nil {
t.Fatalf("Failed to update ClientConn state: %v", err)
}
state := <-cc.NewStateCh
if state != connectivity.TransientFailure {
t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
}
}
// TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we

View File

@ -524,6 +524,14 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster {
return cluster
}
// LocalityOptions contains options to configure a Locality.
type LocalityOptions struct {
// Ports is a set of ports on "localhost" belonging to this locality.
Ports []uint32
// Weight is the weight of the locality, used for load balancing.
Weight uint32
}
// EndpointOptions contains options to configure an Endpoint (or
// ClusterLoadAssignment) resource.
type EndpointOptions struct {
@ -533,9 +541,8 @@ type EndpointOptions struct {
// Host is the hostname of the endpoints. In our e2e tests, hostname must
// always be "localhost".
Host string
// Ports is a set of ports on "localhost" where the endpoints corresponding
// to this resource reside.
Ports []uint32
// Localities is a set of localities belonging to this resource.
Localities []LocalityOptions
// DropPercents is a map from drop category to a drop percentage. If unset,
// no drops are configured.
DropPercents map[string]int
@ -546,34 +553,50 @@ func DefaultEndpoint(clusterName string, host string, ports []uint32) *v3endpoin
return EndpointResourceWithOptions(EndpointOptions{
ClusterName: clusterName,
Host: host,
Ports: ports,
Localities: []LocalityOptions{
{
Ports: ports,
Weight: 1,
},
},
})
}
// EndpointResourceWithOptions returns an xds Endpoint resource configured with
// the provided options.
func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoadAssignment {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range opts.Ports {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
var endpoints []*v3endpointpb.LocalityLbEndpoints
for i, locality := range opts.Localities {
var lbEndpoints []*v3endpointpb.LbEndpoint
for _, port := range locality.Ports {
lbEndpoints = append(lbEndpoints, &v3endpointpb.LbEndpoint{
HostIdentifier: &v3endpointpb.LbEndpoint_Endpoint{Endpoint: &v3endpointpb.Endpoint{
Address: &v3corepb.Address{Address: &v3corepb.Address_SocketAddress{
SocketAddress: &v3corepb.SocketAddress{
Protocol: v3corepb.SocketAddress_TCP,
Address: opts.Host,
PortSpecifier: &v3corepb.SocketAddress_PortValue{PortValue: port}},
}},
}},
}},
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
})
}
endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
Region: fmt.Sprintf("region-%d", i+1),
Zone: fmt.Sprintf("zone-%d", i+1),
SubZone: fmt.Sprintf("subzone-%d", i+1),
},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
Priority: 0,
})
}
cla := &v3endpointpb.ClusterLoadAssignment{
ClusterName: opts.ClusterName,
Endpoints: []*v3endpointpb.LocalityLbEndpoints{{
Locality: &v3corepb.Locality{SubZone: "subzone"},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: 1},
Priority: 0,
}},
Endpoints: endpoints,
}
var drops []*v3endpointpb.ClusterLoadAssignment_Policy_DropOverload

View File

@ -22,13 +22,13 @@ package resolver
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)
@ -124,7 +124,7 @@ type Address struct {
Attributes *attributes.Attributes
// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attribes do not affect SubConn
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
BalancerAttributes *attributes.Attributes
@ -151,7 +151,17 @@ func (a Address) Equal(o Address) bool {
// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr))
sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName))
if a.Attributes != nil {
sb.WriteString(fmt.Sprintf("Attributes: %v, ", a.Attributes.String()))
}
if a.BalancerAttributes != nil {
sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", a.BalancerAttributes.String()))
}
sb.WriteString("}")
return sb.String()
}
// BuildOptions includes additional information for the builder to create

View File

@ -0,0 +1,231 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"fmt"
"testing"
v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3"
v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3"
"github.com/golang/protobuf/proto"
structpb "github.com/golang/protobuf/ptypes/struct"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/resolver"
)
// wrrLocality is a helper that takes a proto message and returns a
// WrrLocalityProto with the proto message marshaled into a proto.Any as a
// child.
func wrrLocality(m proto.Message) *v3wrrlocalitypb.WrrLocality {
return &v3wrrlocalitypb.WrrLocality{
EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(m),
},
},
},
},
}
}
// clusterWithLBConfiguration returns a cluster resource with the proto message
// passed Marshaled to an any and specified through the load_balancing_policy
// field.
func clusterWithLBConfiguration(clusterName, edsServiceName string, secLevel e2e.SecurityLevel, m proto.Message) *v3clusterpb.Cluster {
cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel)
cluster.LoadBalancingPolicy = &v3clusterpb.LoadBalancingPolicy{
Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{
{
TypedExtensionConfig: &v3corepb.TypedExtensionConfig{
TypedConfig: testutils.MarshalAny(m),
},
},
},
}
return cluster
}
// TestWRRLocality tests RPC distribution across a scenario with 5 backends,
// with 2 backends in a locality with weight 1, and 3 backends in a second
// locality with weight 2. Through xDS, the test configures a
// wrr_locality_balancer with either a round robin or custom (specifying pick
// first) child load balancing policy, and asserts the correct distribution
// based on the locality weights and the endpoint picking policy specified.
func (s) TestWrrLocality(t *testing.T) {
oldCustomLBSupport := envconfig.XDSCustomLBPolicy
envconfig.XDSCustomLBPolicy = true
defer func() {
envconfig.XDSCustomLBPolicy = oldCustomLBSupport
}()
backend1 := stubserver.StartTestService(t, nil)
port1 := testutils.ParsePort(t, backend1.Address)
defer backend1.Stop()
backend2 := stubserver.StartTestService(t, nil)
port2 := testutils.ParsePort(t, backend2.Address)
defer backend2.Stop()
backend3 := stubserver.StartTestService(t, nil)
port3 := testutils.ParsePort(t, backend3.Address)
defer backend3.Stop()
backend4 := stubserver.StartTestService(t, nil)
port4 := testutils.ParsePort(t, backend4.Address)
defer backend4.Stop()
backend5 := stubserver.StartTestService(t, nil)
port5 := testutils.ParsePort(t, backend5.Address)
defer backend5.Stop()
const serviceName = "my-service-client-side-xds"
tests := []struct {
name string
// Configuration will be specified through load_balancing_policy field.
wrrLocalityConfiguration *v3wrrlocalitypb.WrrLocality
addressDistributionWant []resolver.Address
}{
{
name: "rr_child",
wrrLocalityConfiguration: wrrLocality(&v3roundrobinpb.RoundRobin{}),
// Each addresses expected probability is locality weight of
// locality / total locality weights multiplied by 1 / number of
// endpoints in each locality (due to round robin across endpoints
// in a locality). Thus, address 1 and address 2 have 1/3 * 1/2
// probability, and addresses 3 4 5 have 2/3 * 1/3 probability of
// being routed to.
addressDistributionWant: []resolver.Address{
{Addr: backend1.Address},
{Addr: backend1.Address},
{Addr: backend1.Address},
{Addr: backend1.Address},
{Addr: backend1.Address},
{Addr: backend1.Address},
{Addr: backend2.Address},
{Addr: backend2.Address},
{Addr: backend2.Address},
{Addr: backend2.Address},
{Addr: backend2.Address},
{Addr: backend2.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend4.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
{Addr: backend5.Address},
},
},
// This configures custom lb as the child of wrr_locality, which points
// to our pick_first implementation. Thus, the expected distribution of
// addresses is locality weight of locality / total locality weights as
// the probability of picking the first backend within the locality
// (e.g. Address 1 for locality 1, and Address 3 for locality 2).
{
name: "custom_lb_child_pick_first",
wrrLocalityConfiguration: wrrLocality(&v3xdsxdstypepb.TypedStruct{
TypeUrl: "type.googleapis.com/pick_first",
Value: &structpb.Struct{},
}),
addressDistributionWant: []resolver.Address{
{Addr: backend1.Address},
{Addr: backend3.Address},
{Addr: backend3.Address},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
managementServer, nodeID, _, r, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()
routeConfigName := "route-" + serviceName
clusterName := "cluster-" + serviceName
endpointsName := "endpoints-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)},
Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(clusterName, endpointsName, e2e.SecurityLevelNone, test.wrrLocalityConfiguration)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: endpointsName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{port1, port2},
Weight: 1,
},
{
Ports: []uint32{port3, port4, port5},
Weight: 2,
},
},
})},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("Failed to dial local test server: %v", err)
}
defer cc.Close()
client := testgrpc.NewTestServiceClient(cc)
if err := roundrobin.CheckWeightedRoundRobinRPCs(ctx, client, test.addressDistributionWant); err != nil {
t.Fatalf("Error in expected round robin: %v", err)
}
})
}
}

View File

@ -38,7 +38,6 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -394,23 +393,22 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
dms[i].OutlierDetection = outlierDetectionToConfig(cu.OutlierDetection)
}
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
}
// lbPolicy is set only when the policy is ringhash. The default (when it's
// not set) is roundrobin. And similarly, we only need to set XDSLBPolicy
// for ringhash (it also defaults to roundrobin).
if lbp := update.lbPolicy; lbp != nil {
lbCfg.XDSLBPolicy = &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Config: &ringhash.LBConfig{
MinRingSize: lbp.MinimumRingSize,
MaxRingSize: lbp.MaximumRingSize,
},
}
bc := &internalserviceconfig.BalancerConfig{}
if err := json.Unmarshal(update.lbPolicy, bc); err != nil {
// This will never occur, valid configuration is emitted from the xDS
// Client. Validity is already checked in the xDS Client, however, this
// double validation is present because Unmarshalling and Validating are
// coupled into one json.Unmarshal operation). We will switch this in
// the future to two separate operations.
b.logger.Errorf("Emitted lbPolicy %s from xDS Client is invalid: %v", update.lbPolicy, err)
return
}
lbCfg.XDSLBPolicy = bc
ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,

View File

@ -63,6 +63,7 @@ var (
IdentityInstanceName: "default2",
SubjectAltNameMatchers: testSANMatchers,
},
LBPolicy: wrrLocalityLBConfigJSON,
}
cdsUpdateWithMissingSecurityCfg = xdsresource.ClusterUpdate{
ClusterName: serviceName,
@ -248,8 +249,11 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -304,8 +308,11 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -461,7 +468,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
@ -495,7 +502,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -548,7 +555,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -564,7 +571,10 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// an update which contains bad security config. So, we expect the CDS
// balancer to forward this error to the EDS balancer and eventually the
// channel needs to be put in a bad state.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
@ -598,7 +608,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -675,8 +685,9 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
RootInstanceName: "default1",
SubjectAltNameMatchers: testSANMatchers,
},
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -700,6 +711,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
RootInstanceName: "default2",
SubjectAltNameMatchers: testSANMatchers,
},
LBPolicy: wrrLocalityLBConfigJSON,
}
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)

View File

@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
@ -60,6 +61,20 @@ var (
noopODLBCfg = outlierdetection.LBConfig{
Interval: 1<<63 - 1,
}
wrrLocalityLBConfig = &internalserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: "round_robin",
},
},
}
wrrLocalityLBConfigJSON, _ = json.Marshal(wrrLocalityLBConfig)
ringHashLBConfig = &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Config: &ringhash.LBConfig{MinRingSize: 10, MaxRingSize: 100},
}
ringHashLBConfigJSON, _ = json.Marshal(ringHashLBConfig)
)
type s struct {
@ -381,20 +396,27 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
wantCCS balancer.ClientConnState
}{
{
name: "happy-case-with-lrs",
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf},
wantCCS: edsCCS(serviceName, nil, true, nil, noopODLBCfg),
name: "happy-case-with-lrs",
cdsUpdate: xdsresource.ClusterUpdate{
ClusterName: serviceName,
LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LBPolicy: wrrLocalityLBConfigJSON,
},
wantCCS: edsCCS(serviceName, nil, true, wrrLocalityLBConfig, noopODLBCfg),
},
{
name: "happy-case-without-lrs",
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName},
wantCCS: edsCCS(serviceName, nil, false, nil, noopODLBCfg),
name: "happy-case-without-lrs",
cdsUpdate: xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
},
wantCCS: edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg),
},
{
name: "happy-case-with-ring-hash-lb-policy",
cdsUpdate: xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
LBPolicy: ringHashLBConfigJSON,
},
wantCCS: edsCCS(serviceName, nil, false, &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
@ -403,21 +425,25 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
},
{
name: "happy-case-outlier-detection",
cdsUpdate: xdsresource.ClusterUpdate{ClusterName: serviceName, OutlierDetection: &xdsresource.OutlierDetection{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
MaxEjectionPercent: 10,
SuccessRateStdevFactor: 1900,
EnforcingSuccessRate: 100,
SuccessRateMinimumHosts: 5,
SuccessRateRequestVolume: 100,
FailurePercentageThreshold: 85,
EnforcingFailurePercentage: 5,
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
}},
wantCCS: edsCCS(serviceName, nil, false, nil, outlierdetection.LBConfig{
cdsUpdate: xdsresource.ClusterUpdate{
ClusterName: serviceName,
OutlierDetection: &xdsresource.OutlierDetection{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
MaxEjectionPercent: 10,
SuccessRateStdevFactor: 1900,
EnforcingSuccessRate: 100,
SuccessRateMinimumHosts: 5,
SuccessRateRequestVolume: 100,
FailurePercentageThreshold: 85,
EnforcingFailurePercentage: 5,
FailurePercentageMinimumHosts: 5,
FailurePercentageRequestVolume: 50,
},
LBPolicy: wrrLocalityLBConfigJSON,
},
wantCCS: edsCCS(serviceName, nil, false, wrrLocalityLBConfig, outlierdetection.LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
@ -501,8 +527,11 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
@ -586,8 +615,11 @@ func (s) TestResolverError(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}
@ -635,8 +667,11 @@ func (s) TestUpdateSubConnState(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -665,13 +700,16 @@ func (s) TestCircuitBreaking(t *testing.T) {
cancel()
cdsB.Close()
}()
// Here we invoke the watch callback registered on the fake xdsClient. This
// will trigger the watch handler on the CDS balancer, which will update
// the service's counter with the new max requests.
var maxRequests uint32 = 1
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
wantCCS := edsCCS(clusterName, &maxRequests, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: clusterName,
MaxRequests: &maxRequests,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(clusterName, &maxRequests, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -699,14 +737,16 @@ func (s) TestClose(t *testing.T) {
// provided xdsClient.
xdsC, cdsB, edsB, _, cancel := setupWithWatch(t)
defer cancel()
// Here we invoke the watch callback registered on the fake xdsClient. This
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -776,8 +816,11 @@ func (s) TestExitIdle(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false, nil, noopODLBCfg)
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfig, noopODLBCfg)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {

View File

@ -17,6 +17,7 @@
package cdsbalancer
import (
"encoding/json"
"errors"
"sync"
@ -38,13 +39,9 @@ var (
type clusterHandlerUpdate struct {
// securityCfg is the Security Config from the top (root) cluster.
securityCfg *xdsresource.SecurityConfig
// lbPolicy is the lb policy from the top (root) cluster.
//
// Currently, we only support roundrobin or ringhash, and since roundrobin
// does need configs, this is only set to the ringhash config, if the policy
// is ringhash. In the future, if we support more policies, we can make this
// an interface, and set it to config of the other policies.
lbPolicy *xdsresource.ClusterLBPolicyRingHash
// lbPolicy is the the child of the cluster_impl policy, for all priorities.
lbPolicy json.RawMessage
// updates is a list of ClusterUpdates from all the leaf clusters.
updates []xdsresource.ClusterUpdate
@ -123,6 +120,7 @@ func (ch *clusterHandler) constructClusterUpdate() {
case <-ch.updateChannel:
default:
}
ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.createdClusters[ch.rootClusterName].clusterUpdate.SecurityCfg,
lbPolicy: ch.createdClusters[ch.rootClusterName].clusterUpdate.LBPolicy,

View File

@ -52,7 +52,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
name string
clusterName string
clusterUpdate xdsresource.ClusterUpdate
lbPolicy *xdsresource.ClusterLBPolicyRingHash
}{
{
name: "test-update-root-cluster-EDS-success",
@ -62,16 +61,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
ClusterName: edsService,
},
},
{
name: "test-update-root-cluster-EDS-with-ring-hash",
clusterName: logicalDNSService,
clusterUpdate: xdsresource.ClusterUpdate{
ClusterType: xdsresource.ClusterTypeLogicalDNS,
ClusterName: logicalDNSService,
LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
lbPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
{
name: "test-update-root-cluster-Logical-DNS-success",
clusterName: logicalDNSService,
@ -111,9 +100,6 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
if diff := cmp.Diff(chu.updates, []xdsresource.ClusterUpdate{test.clusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
if diff := cmp.Diff(chu.lbPolicy, test.lbPolicy); diff != "" {
t.Fatalf("got unexpected lb policy in cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}

View File

@ -112,9 +112,14 @@ func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
// drops all RPCs, but with no change in the load reporting server config.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{
e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: "endpoints-" + serviceName,
Host: "localhost",
Ports: []uint32{testutils.ParsePort(t, server.Address)},
ClusterName: "endpoints-" + serviceName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Ports: []uint32{testutils.ParsePort(t, server.Address)},
Weight: 1,
},
},
DropPercents: map[string]int{"test-drop-everything": 100},
}),
}

View File

@ -27,8 +27,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
@ -378,7 +376,6 @@ func (s) TestOutlierDetection(t *testing.T) {
t.Fatal(err)
}
localityID := xdsinternal.LocalityID{Zone: "zone"}
// The priority configuration generated should have Outlier Detection as a
// direct child due to Outlier Detection being turned on.
pCfgWant := &priority.LBConfig{
@ -393,17 +390,6 @@ func (s) TestOutlierDetection(t *testing.T) {
Config: &clusterimpl.LBConfig{
Cluster: testClusterName,
EDSServiceName: "test-eds-service-name",
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(localityID.ToString): {
Weight: 100,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
},
},

View File

@ -286,10 +286,6 @@ func TestParseConfig(t *testing.T) {
}
}
func newString(s string) *string {
return &s
}
func newUint32(i uint32) *uint32 {
return &i
}

View File

@ -23,9 +23,7 @@ import (
"fmt"
"sort"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
@ -34,7 +32,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -63,33 +61,6 @@ type priorityConfig struct {
//
// The built tree of balancers (see test for the output struct).
//
// If xds lb policy is ROUND_ROBIN, the children will be weighted_target for
// locality picking, and round_robin for endpoint picking.
//
// ┌────────┐
// │priority│
// └┬──────┬┘
// │ │
// ┌───────────▼┐ ┌▼───────────┐
// │cluster_impl│ │cluster_impl│
// └─┬──────────┘ └──────────┬─┘
// │ │
// ┌──────────────▼─┐ ┌─▼──────────────┐
// │locality_picking│ │locality_picking│
// └┬──────────────┬┘ └┬──────────────┬┘
// │ │ │ │
// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐
// │LRS│ │LRS│ │LRS│ │LRS│
// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘
// │ │ │ │
// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐
// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│
// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘
//
// If xds lb policy is RING_HASH, the children will be just a ring_hash policy.
// The endpoints from all localities will be flattened to one addresses list,
// and the ring_hash policy will pick endpoints from it.
//
// ┌────────┐
// │priority│
// └┬──────┬┘
@ -99,13 +70,8 @@ type priorityConfig struct {
// └──────┬─────┘ └─────┬──────┘
// │ │
// ┌──────▼─────┐ ┌─────▼──────┐
// │ ring_hash │ │ ring_hash │
// │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer)
// └────────────┘ └────────────┘
//
// If endpointPickingPolicy is nil, roundrobin will be used.
//
// Custom locality picking policy isn't support, and weighted_target is always
// used.
func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
if err != nil {
@ -284,55 +250,11 @@ func dedupSortedIntSlice(a []int) []int {
return a[:i+1]
}
// rrBalancerConfig is a const roundrobin config, used as child of
// weighted-roundrobin. To avoid allocating memory everytime.
var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}
// priorityLocalitiesToClusterImpl takes a list of localities (with the same
// priority), and generates a cluster impl policy config, and a list of
// addresses.
// addresses with their path hierarchy set to [priority-name, locality-name], so
// priority and the xDS LB Policy know which child policy each address is for.
func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
clusterImplCfg := &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
EDSServiceName: mechanism.EDSServiceName,
LoadReportingServer: mechanism.LoadReportingServer,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
DropCategories: drops,
// ChildPolicy is not set. Will be set based on xdsLBPolicy
}
if xdsLBPolicy == nil || xdsLBPolicy.Name == roundrobin.Name {
// If lb policy is ROUND_ROBIN:
// - locality-picking policy is weighted_target
// - endpoint-picking policy is round_robin
logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", roundrobin.Name)
// Child of weighted_target is hardcoded to round_robin.
wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig)
clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig}
return clusterImplCfg, addrs, nil
}
if xdsLBPolicy.Name == ringhash.Name {
// If lb policy is RIHG_HASH, will build one ring_hash policy as child.
// The endpoints from all localities will be flattened to one addresses
// list, and the ring_hash policy will pick endpoints from it.
logger.Infof("xds lb policy is %q, building config with ring_hash", ringhash.Name)
addrs := localitiesToRingHash(localities, priorityName)
// Set child to ring_hash, note that the ring_hash config is from
// xdsLBPolicy.
clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: ringhash.Name, Config: xdsLBPolicy.Config}
return clusterImplCfg, addrs, nil
}
return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, roundrobin.Name, ringhash.Name)
}
// localitiesToRingHash takes a list of localities (with the same priority), and
// generates a list of addresses.
//
// The addresses have path hierarchy set to [priority-name], so priority knows
// which child policy they are for.
func localitiesToRingHash(localities []xdsresource.Locality, priorityName string) []resolver.Address {
var addrs []resolver.Address
for _, locality := range localities {
var lw uint32 = 1
@ -350,54 +272,29 @@ func localitiesToRingHash(localities []xdsresource.Locality, priorityName string
if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
continue
}
addr := resolver.Address{Addr: endpoint.Address}
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
addr = internal.SetLocalityID(addr, locality.ID)
// "To provide the xds_wrr_locality load balancer information about
// locality weights received from EDS, the cluster resolver will
// populate a new locality weight attribute for each address The
// attribute will have the weight (as an integer) of the locality
// the address is part of." - A52
addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
var ew uint32 = 1
if endpoint.Weight != 0 {
ew = endpoint.Weight
}
// The weight of each endpoint is locality_weight * endpoint_weight.
ai := weightedroundrobin.AddrInfo{Weight: lw * ew}
addr := weightedroundrobin.SetAddrInfo(resolver.Address{Addr: endpoint.Address}, ai)
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
addr = internal.SetLocalityID(addr, locality.ID)
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
addrs = append(addrs, addr)
}
}
return addrs
}
// localitiesToWeightedTarget takes a list of localities (with the same
// priority), and generates a weighted target config, and list of addresses.
//
// The addresses have path hierarchy set to [priority-name, locality-name], so
// priority and weighted target know which child policy they are for.
func localitiesToWeightedTarget(localities []xdsresource.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig) (*weightedtarget.LBConfig, []resolver.Address) {
weightedTargets := make(map[string]weightedtarget.Target)
var addrs []resolver.Address
for _, locality := range localities {
localityStr, err := locality.ID.ToString()
if err != nil {
localityStr = fmt.Sprintf("%+v", locality.ID)
}
weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy}
for _, endpoint := range locality.Endpoints {
// Filter out all "unhealthy" endpoints (unknown and healthy are
// both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
continue
}
addr := resolver.Address{Addr: endpoint.Address}
if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 {
ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight}
addr = weightedroundrobin.SetAddrInfo(addr, ai)
}
addr = hierarchy.Set(addr, []string{priorityName, localityStr})
addr = internal.SetLocalityID(addr, locality.ID)
addrs = append(addrs, addr)
}
}
return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs
return &clusterimpl.LBConfig{
Cluster: mechanism.Cluster,
EDSServiceName: mechanism.EDSServiceName,
LoadReportingServer: mechanism.LoadReportingServer,
MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
DropCategories: drops,
ChildPolicy: xdsLBPolicy,
}, addrs, nil
}

View File

@ -30,7 +30,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/hierarchy"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
@ -39,6 +38,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -68,7 +68,8 @@ var (
return out[i].Addr < out[j].Addr
})
return out
})}
}),
}
noopODCfg = outlierdetection.LBConfig{
Interval: 1<<63 - 1,
@ -230,21 +231,6 @@ func TestBuildPriorityConfig(t *testing.T) {
Cluster: testClusterName,
EDSServiceName: testEDSServiceName,
DropCategories: []clusterimpl.DropConfig{},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[0].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[1].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
},
},
@ -262,21 +248,6 @@ func TestBuildPriorityConfig(t *testing.T) {
Cluster: testClusterName,
EDSServiceName: testEDSServiceName,
DropCategories: []clusterimpl.DropConfig{},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[2].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[3].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
},
},
@ -393,21 +364,6 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
RequestsPerMillion: testDropOverMillion,
},
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[0].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[1].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
"priority-2-1": {
Cluster: testClusterName,
@ -420,32 +376,17 @@ func TestBuildClusterImplConfigForEDS(t *testing.T) {
RequestsPerMillion: testDropOverMillion,
},
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(testLocalityIDs[2].ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(testLocalityIDs[3].ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
},
},
}
wantAddrs := []resolver.Address{
testAddrWithAttrs(testAddressStrs[0][0], nil, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[0][1], nil, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[1][0], nil, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[1][1], nil, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[2][0], nil, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[2][1], nil, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[3][0], nil, "priority-2-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[3][1], nil, "priority-2-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[0][0], 20, 1, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[0][1], 20, 1, "priority-2-0", &testLocalityIDs[0]),
testAddrWithAttrs(testAddressStrs[1][0], 80, 1, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[1][1], 80, 1, "priority-2-0", &testLocalityIDs[1]),
testAddrWithAttrs(testAddressStrs[2][0], 20, 1, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[2][1], 20, 1, "priority-2-1", &testLocalityIDs[2]),
testAddrWithAttrs(testAddressStrs[3][0], 80, 1, "priority-2-1", &testLocalityIDs[3]),
testAddrWithAttrs(testAddressStrs[3][1], 80, 1, "priority-2-1", &testLocalityIDs[3]),
}
if diff := cmp.Diff(gotNames, wantNames); diff != "" {
@ -594,31 +535,13 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
wantConfig: &clusterimpl.LBConfig{
Cluster: testClusterName,
EDSServiceName: testEDSService,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedtarget.Name,
Config: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
},
},
},
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
@ -651,26 +574,12 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
},
},
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", newUint32(1800), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(200), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(7200), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(800), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-1-1", 20, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", 20, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", 80, 90, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", 80, 10, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
name: "unsupported child",
localities: []xdsresource.Locality{{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
}},
priorityName: "test-priority",
childPolicy: &internalserviceconfig.BalancerConfig{Name: "some-child"},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -688,267 +597,6 @@ func TestPriorityLocalitiesToClusterImpl(t *testing.T) {
}
}
func TestLocalitiesToWeightedTarget(t *testing.T) {
tests := []struct {
name string
localities []xdsresource.Locality
priorityName string
childPolicy *internalserviceconfig.BalancerConfig
lrsServer *string
wantConfig *weightedtarget.LBConfig
wantAddrs []resolver.Address
}{
{
name: "roundrobin as child, with LRS",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
Weight: 80,
},
},
priorityName: "test-priority",
childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
lrsServer: newString("test-lrs-server"),
wantConfig: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
},
},
},
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
name: "roundrobin as child, no LRS",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
Weight: 80,
},
},
priorityName: "test-priority",
childPolicy: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name},
// lrsServer is nil, so LRS policy will not be used.
wantConfig: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
},
},
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", nil, "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
name: "weighted round robin as child, no LRS",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
Weight: 80,
},
},
priorityName: "test-priority",
childPolicy: &internalserviceconfig.BalancerConfig{Name: weightedroundrobin.Name},
// lrsServer is nil, so LRS policy will not be used.
wantConfig: &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
assertString(internal.LocalityID{Zone: "test-zone-1"}.ToString): {
Weight: 20,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedroundrobin.Name,
},
},
assertString(internal.LocalityID{Zone: "test-zone-2"}.ToString): {
Weight: 80,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: weightedroundrobin.Name,
},
},
},
},
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := localitiesToWeightedTarget(tt.localities, tt.priorityName, tt.childPolicy)
if diff := cmp.Diff(got, tt.wantConfig); diff != "" {
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
}
if diff := cmp.Diff(got1, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
}
})
}
}
func TestLocalitiesToRingHash(t *testing.T) {
tests := []struct {
name string
localities []xdsresource.Locality
priorityName string
wantAddrs []resolver.Address
}{
{
// Check that address weights are locality_weight * endpoint_weight.
name: "with locality and endpoint weight",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
Weight: 80,
},
},
priorityName: "test-priority",
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", newUint32(1800), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(200), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(7200), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(800), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
// Check that endpoint_weight is 0, weight is the locality weight.
name: "locality weight only",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
Weight: 20,
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
Weight: 80,
},
},
priorityName: "test-priority",
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", newUint32(20), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(20), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(80), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(80), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
{
// Check that locality_weight is 0, weight is the endpoint weight.
name: "endpoint weight only",
localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-1-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-1-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-1"},
},
{
Endpoints: []xdsresource.Endpoint{
{Address: "addr-2-1", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 90},
{Address: "addr-2-2", HealthStatus: xdsresource.EndpointHealthStatusHealthy, Weight: 10},
},
ID: internal.LocalityID{Zone: "test-zone-2"},
},
},
priorityName: "test-priority",
wantAddrs: []resolver.Address{
testAddrWithAttrs("addr-1-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-1-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-1"}),
testAddrWithAttrs("addr-2-1", newUint32(90), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
testAddrWithAttrs("addr-2-2", newUint32(10), "test-priority", &internal.LocalityID{Zone: "test-zone-2"}),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := localitiesToRingHash(tt.localities, tt.priorityName)
if diff := cmp.Diff(got, tt.wantAddrs, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
t.Errorf("localitiesToWeightedTarget() diff (-got +want) %v", diff)
}
})
}
}
func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
@ -957,17 +605,16 @@ func assertString(f func() (string, error)) string {
return s
}
func testAddrWithAttrs(addrStr string, weight *uint32, priority string, lID *internal.LocalityID) resolver.Address {
func testAddrWithAttrs(addrStr string, localityWeight, endpointWeight uint32, priority string, lID *internal.LocalityID) resolver.Address {
addr := resolver.Address{Addr: addrStr}
if weight != nil {
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: *weight})
}
path := []string{priority}
if lID != nil {
path = append(path, assertString(lID.ToString))
addr = internal.SetLocalityID(addr, *lID)
}
addr = hierarchy.Set(addr, path)
addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: localityWeight})
addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: localityWeight * endpointWeight})
return addr
}

View File

@ -364,23 +364,6 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
t.Fatal(err)
}
// Change the weight of locality2 and ensure weighted roundrobin. Since
// locality2 has twice the weight of locality3, it will be picked twice as
// frequently as locality3 for RPCs. And since locality2 has a single
// backend and locality3 has two backends, the backend in locality2 will
// receive four times the traffic of each of locality3's backends.
resources = clientEndpointsResource(nodeID, edsServiceName, []localityInfo{
{name: localityName2, weight: 2, ports: ports[1:2]},
{name: localityName3, weight: 1, ports: ports[2:4]},
})
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
wantAddrs = []resolver.Address{addrs[1], addrs[1], addrs[1], addrs[1], addrs[2], addrs[3]}
if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
t.Fatal(err)
}
}
// TestEDS_EndpointsHealth tests the cluster_resolver LB policy using an EDS

View File

@ -26,6 +26,7 @@ import (
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/connectivity"
@ -35,15 +36,24 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)
var (
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs []string
testClusterNames = []string{"test-cluster-1", "test-cluster-2"}
testSubZones = []string{"I", "II", "III", "IV"}
testEndpointAddrs []string
wrrLocalityLBConfig = &internalserviceconfig.BalancerConfig{
Name: wrrlocality.Name,
Config: &wrrlocality.LBConfig{
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: "round_robin",
},
},
}
)
const testBackendAddrsCount = 12
@ -75,6 +85,7 @@ func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig)
Cluster: testClusterName,
Type: DiscoveryMechanismTypeEDS,
}},
XDSLBPolicy: wrrLocalityLBConfig,
},
}); err != nil {
edsb.Close()
@ -844,6 +855,7 @@ func (s) TestFallbackToDNS(t *testing.T) {
DNSHostname: testDNSTarget,
},
},
XDSLBPolicy: wrrLocalityLBConfig,
},
}); err != nil {
t.Fatal(err)

View File

@ -28,8 +28,12 @@ import (
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/grpclog"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
)
// Name is the name of wrr_locality balancer.
@ -45,10 +49,6 @@ func (bb) Name() string {
return Name
}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
return nil
}
// LBConfig is the config for the wrr locality balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig
@ -56,13 +56,146 @@ type LBConfig struct {
ChildPolicy *internalserviceconfig.BalancerConfig `json:"childPolicy,omitempty"`
}
// To plumb in a different child in tests.
var weightedTargetName = weightedtarget.Name
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
builder := balancer.Get(weightedTargetName)
if builder == nil {
// Shouldn't happen, registered through imported weighted target,
// defensive programming.
return nil
}
// Doesn't need to intercept any balancer.ClientConn operations; pass
// through by just giving cc to child balancer.
wtb := builder.Build(cc, bOpts)
if wtb == nil {
// shouldn't happen, defensive programming.
return nil
}
wtbCfgParser, ok := builder.(balancer.ConfigParser)
if !ok {
// Shouldn't happen, imported weighted target builder has this method.
return nil
}
wrrL := &wrrLocalityBalancer{
child: wtb,
childParser: wtbCfgParser,
}
wrrL.logger = prefixLogger(wrrL)
wrrL.logger.Infof("Created")
return wrrL
}
func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var lbCfg *LBConfig
if err := json.Unmarshal(s, &lbCfg); err != nil {
return nil, fmt.Errorf("xds: invalid LBConfig for wrrlocality: %s, error: %v", string(s), err)
return nil, fmt.Errorf("xds_wrr_locality: invalid LBConfig: %s, error: %v", string(s), err)
}
if lbCfg == nil || lbCfg.ChildPolicy == nil {
return nil, errors.New("xds: invalidw LBConfig for wrrlocality: child policy field must be set")
return nil, errors.New("xds_wrr_locality: invalid LBConfig: child policy field must be set")
}
return lbCfg, nil
}
type attributeKey struct{}
// Equal allows the values to be compared by Attributes.Equal.
func (a AddrInfo) Equal(o interface{}) bool {
oa, ok := o.(AddrInfo)
return ok && oa.LocalityWeight == a.LocalityWeight
}
// AddrInfo is the locality weight of the locality an address is a part of.
type AddrInfo struct {
LocalityWeight uint32
}
// SetAddrInfo returns a copy of addr in which the BalancerAttributes field is
// updated with AddrInfo.
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
addr.BalancerAttributes = addr.BalancerAttributes.WithValue(attributeKey{}, addrInfo)
return addr
}
func (a AddrInfo) String() string {
return fmt.Sprintf("Locality Weight: %d", a.LocalityWeight)
}
// getAddrInfo returns the AddrInfo stored in the BalancerAttributes field of
// addr. Returns false if no AddrInfo found.
func getAddrInfo(addr resolver.Address) (AddrInfo, bool) {
v := addr.BalancerAttributes.Value(attributeKey{})
ai, ok := v.(AddrInfo)
return ai, ok
}
// wrrLocalityBalancer wraps a weighted target balancer, and builds
// configuration for the weighted target once it receives configuration
// specifying the weighted target child balancer and locality weight
// information.
type wrrLocalityBalancer struct {
// child will be a weighted target balancer, and will be built it at
// wrrLocalityBalancer build time. Other than preparing configuration, other
// balancer operations are simply pass through.
child balancer.Balancer
childParser balancer.ConfigParser
logger *grpclog.PrefixLogger
}
func (b *wrrLocalityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
lbCfg, ok := s.BalancerConfig.(*LBConfig)
if !ok {
b.logger.Errorf("Received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
return balancer.ErrBadResolverState
}
weightedTargets := make(map[string]weightedtarget.Target)
for _, addr := range s.ResolverState.Addresses {
// This get of LocalityID could potentially return a zero value. This
// shouldn't happen though (this attribute that is set actually gets
// used to build localities in the first place), and thus don't error
// out, and just build a weighted target with undefined behavior.
locality, err := internal.GetLocalityID(addr).ToString()
if err != nil {
// Should never happen.
logger.Errorf("Failed to marshal LocalityID: %v, skipping this locality in weighted target")
}
ai, ok := getAddrInfo(addr)
if !ok {
return fmt.Errorf("xds_wrr_locality: missing locality weight information in address %q", addr)
}
weightedTargets[locality] = weightedtarget.Target{Weight: ai.LocalityWeight, ChildPolicy: lbCfg.ChildPolicy}
}
wtCfg := &weightedtarget.LBConfig{Targets: weightedTargets}
wtCfgJSON, err := json.Marshal(wtCfg)
if err != nil {
// Shouldn't happen.
return fmt.Errorf("xds_wrr_locality: error marshalling prepared config: %v", wtCfg)
}
var sc serviceconfig.LoadBalancingConfig
if sc, err = b.childParser.ParseConfig(wtCfgJSON); err != nil {
return fmt.Errorf("xds_wrr_locality: config generated %v is invalid: %v", wtCfgJSON, err)
}
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: sc,
})
}
func (b *wrrLocalityBalancer) ResolverError(err error) {
b.child.ResolverError(err)
}
func (b *wrrLocalityBalancer) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
b.child.UpdateSubConnState(sc, scState)
}
func (b *wrrLocalityBalancer) Close() {
b.child.Close()
}

View File

@ -19,17 +19,28 @@
package wrrlocality
import (
"context"
"encoding/json"
"errors"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedtarget"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
)
const (
defaultTestTimeout = 5 * time.Second
)
type s struct {
@ -119,3 +130,123 @@ func (s) TestParseConfig(t *testing.T) {
})
}
}
// TestUpdateClientConnState tests the UpdateClientConnState method of the
// wrr_locality_experimental balancer. This UpdateClientConn operation should
// take the localities and their weights in the addresses passed in, alongside
// the endpoint picking policy defined in the Balancer Config and construct a
// weighted target configuration corresponding to these inputs.
func (s) TestUpdateClientConnState(t *testing.T) {
// Configure the stub balancer defined below as the child policy of
// wrrLocalityBalancer.
cfgCh := testutils.NewChannel()
oldWeightedTargetName := weightedTargetName
defer func() {
weightedTargetName = oldWeightedTargetName
}()
weightedTargetName = "fake_weighted_target"
stub.Register("fake_weighted_target", stub.BalancerFuncs{
ParseConfig: func(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg weightedtarget.LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
return &cfg, nil
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
wtCfg, ok := ccs.BalancerConfig.(*weightedtarget.LBConfig)
if !ok {
return errors.New("child received config that was not a weighted target config")
}
defer cfgCh.Send(wtCfg)
return nil
},
})
builder := balancer.Get(Name)
if builder == nil {
t.Fatalf("balancer.Get(%q) returned nil", Name)
}
tcc := testutils.NewTestClientConn(t)
bal := builder.Build(tcc, balancer.BuildOptions{})
defer bal.Close()
wrrL := bal.(*wrrLocalityBalancer)
// Create the addresses with two localities with certain locality weights.
// This represents what addresses the wrr_locality balancer will receive in
// UpdateClientConnState.
addr1 := resolver.Address{
Addr: "locality-1",
}
addr1 = internal.SetLocalityID(addr1, internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
})
addr1 = SetAddrInfo(addr1, AddrInfo{LocalityWeight: 2})
addr2 := resolver.Address{
Addr: "locality-2",
}
addr2 = internal.SetLocalityID(addr2, internal.LocalityID{
Region: "region-2",
Zone: "zone-2",
SubZone: "subzone-2",
})
addr2 = SetAddrInfo(addr2, AddrInfo{LocalityWeight: 1})
addrs := []resolver.Address{addr1, addr2}
err := wrrL.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: "round_robin",
},
},
ResolverState: resolver.State{
Addresses: addrs,
},
})
if err != nil {
t.Fatalf("Unexpected error from UpdateClientConnState: %v", err)
}
// Note that these inline strings declared as the key in Targets built from
// Locality ID are not exactly what is shown in the example in the gRFC.
// However, this is an implementation detail that does not affect
// correctness (confirmed with Java team). The important thing is to get
// those three pieces of information region, zone, and subzone down to the
// child layer.
wantWtCfg := &weightedtarget.LBConfig{
Targets: map[string]weightedtarget.Target{
"{\"region\":\"region-1\",\"zone\":\"zone-1\",\"subZone\":\"subzone-1\"}": {
Weight: 2,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: "round_robin",
},
},
"{\"region\":\"region-2\",\"zone\":\"zone-2\",\"subZone\":\"subzone-2\"}": {
Weight: 1,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: "round_robin",
},
},
},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cfg, err := cfgCh.Receive(ctx)
if err != nil {
t.Fatalf("No signal received from UpdateClientConnState() on the child: %v", err)
}
gotWtCfg, ok := cfg.(*weightedtarget.LBConfig)
if !ok {
// Shouldn't happen - only sends a config on this channel.
t.Fatalf("Unexpected config type: %T", gotWtCfg)
}
if diff := cmp.Diff(gotWtCfg, wantWtCfg); diff != "" {
t.Fatalf("Child received unexpected config, diff (-got, +want): %v", diff)
}
}

View File

@ -0,0 +1,34 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package wrrlocality
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
const prefix = "[wrrlocality-lb %p] "
var logger = grpclog.Component("xds")
func prefixLogger(p *wrrLocalityBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

View File

@ -70,7 +70,7 @@ func verifyClusterUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("received update with error type %v, want %v", gotType, wantType)
}
}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON")}
cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy")}
if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" {
return fmt.Errorf("received unepected diff in the cluster resource update: (-want, got):\n%s", diff)
}

View File

@ -134,9 +134,13 @@ func (s) TestEDSWatch(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -153,9 +157,13 @@ func (s) TestEDSWatch(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -265,9 +273,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -277,9 +289,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -295,9 +311,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -307,9 +327,13 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost2, edsPort2), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -460,9 +484,13 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -541,9 +569,13 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -669,9 +701,13 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},
@ -801,9 +837,13 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
Localities: []xdsresource.Locality{
{
Endpoints: []xdsresource.Endpoint{{Address: fmt.Sprintf("%s:%d", edsHost1, edsPort1), Weight: 1}},
ID: internal.LocalityID{SubZone: "subzone"},
Priority: 0,
Weight: 1,
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
Priority: 0,
Weight: 1,
},
},
},

View File

@ -305,7 +305,11 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
{
Endpoints: []xdsresource.Endpoint{{Address: "localhost:666", Weight: 1}},
Weight: 1,
ID: internal.LocalityID{SubZone: "subzone"},
ID: internal.LocalityID{
Region: "region-1",
Zone: "zone-1",
SubZone: "subzone-1",
},
},
},
},

View File

@ -802,7 +802,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
}
cmpOpts := []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicyJSON"),
cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"),
}
if diff := cmp.Diff(test.wantUpdate, gotUpdate, cmpOpts...); diff != "" {
t.Fatalf("Unexpected diff in metadata, diff (-want +got):\n%s", diff)

View File

@ -357,7 +357,6 @@ func (s) TestValidateCluster_Success(t *testing.T) {
},
wantUpdate: xdsresource.ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName, LRSServerConfig: xdsresource.ClusterLRSServerSelf,
LBPolicy: &xdsresource.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
wantLBConfig: &internalserviceconfig.BalancerConfig{
Name: "ring_hash_experimental",
@ -589,11 +588,11 @@ func (s) TestValidateCluster_Success(t *testing.T) {
// compare JSON bytes in a test. Thus, marshal into a Balancer
// Config struct and compare on that. Only need to test this JSON
// emission here, as this covers the possible output space.
if diff := cmp.Diff(update, test.wantUpdate, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "LBPolicy", "LBPolicyJSON")); diff != "" {
if diff := cmp.Diff(update, test.wantUpdate, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "LBPolicy")); diff != "" {
t.Errorf("validateClusterAndConstructClusterUpdate(%+v) got diff: %v (-got, +want)", test.cluster, diff)
}
bc := &internalserviceconfig.BalancerConfig{}
if err := json.Unmarshal(update.LBPolicyJSON, bc); err != nil {
if err := json.Unmarshal(update.LBPolicy, bc); err != nil {
t.Fatalf("failed to unmarshal JSON: %v", err)
}
if diff := cmp.Diff(bc, test.wantLBConfig); diff != "" {

View File

@ -52,13 +52,6 @@ const (
ClusterLRSServerSelf
)
// ClusterLBPolicyRingHash represents ring_hash lb policy, and also contains its
// config.
type ClusterLBPolicyRingHash struct {
MinimumRingSize uint64
MaximumRingSize uint64
}
// OutlierDetection is the outlier detection configuration for a cluster.
type OutlierDetection struct {
// Interval is the time interval between ejection analysis sweeps. This can
@ -148,21 +141,9 @@ type ClusterUpdate struct {
// a prioritized list of cluster names.
PrioritizedClusterNames []string
// LBPolicy is the lb policy for this cluster.
//
// This only support round_robin and ring_hash.
// - if it's nil, the lb policy is round_robin
// - if it's not nil, the lb policy is ring_hash, the this field has the config.
//
// When we add more support policies, this can be made an interface, and
// will be set to different types based on the policy type.
LBPolicy *ClusterLBPolicyRingHash
// LBPolicyJSON represents the locality and endpoint picking policy in JSON,
// which will be the child policy of xds_cluster_impl. Once full support for
// this field across the system, the LBPolicy field will switch to this
// field. Right now we keep both to keep the system working even though
// downstream has not added support for this JSON field.
LBPolicyJSON json.RawMessage
// LBPolicy represents the locality and endpoint picking policy in JSON,
// which will be the child policy of xds_cluster_impl.
LBPolicy json.RawMessage
// OutlierDetection is the outlier detection configuration for this cluster.
// If nil, it means this cluster does not use the outlier detection feature.

View File

@ -77,13 +77,11 @@ const (
)
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
var lbPolicy *ClusterLBPolicyRingHash
var lbCfgJSON json.RawMessage
var lbPolicy json.RawMessage
var err error
switch cluster.GetLbPolicy() {
case v3clusterpb.Cluster_ROUND_ROBIN:
lbPolicy = nil // The default is round_robin, and there's no config to set.
lbCfgJSON = []byte(fmt.Sprintf(`[{%q: {"childPolicy": [{"round_robin": {}}]}}]`, "xds_wrr_locality_experimental"))
lbPolicy = []byte(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`)
case v3clusterpb.Cluster_RING_HASH:
if !envconfig.XDSRingHash {
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
@ -101,10 +99,9 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
if max := rhc.GetMaximumRingSize(); max != nil {
maxSize = max.GetValue()
}
lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize}
rhLBCfgJSON := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize))
lbCfgJSON = []byte(fmt.Sprintf(`[{%q: %s}]`, "ring_hash_experimental", rhLBCfgJSON))
rhLBCfg := []byte(fmt.Sprintf("{\"minRingSize\": %d, \"maxRingSize\": %d}", minSize, maxSize))
lbPolicy = []byte(fmt.Sprintf(`[{"ring_hash_experimental": %s}]`, rhLBCfg))
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
@ -129,7 +126,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
}
if cluster.GetLoadBalancingPolicy() != nil && envconfig.XDSCustomLBPolicy {
lbCfgJSON, err = xdslbregistry.ConvertToServiceConfig(cluster.GetLoadBalancingPolicy())
lbPolicy, err = xdslbregistry.ConvertToServiceConfig(cluster.GetLoadBalancingPolicy())
if err != nil {
return ClusterUpdate{}, fmt.Errorf("error converting LoadBalancingPolicy %v in response: %+v: %v", cluster.GetLoadBalancingPolicy(), cluster, err)
}
@ -137,8 +134,8 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
// converted configuration. It will do this by having the gRPC LB policy
// registry parse the configuration." - A52
bc := &internalserviceconfig.BalancerConfig{}
if err := json.Unmarshal(lbCfgJSON, bc); err != nil {
return ClusterUpdate{}, fmt.Errorf("JSON generated from xDS LB policy registry: %s is invalid: %v", pretty.FormatJSON(lbCfgJSON), err)
if err := json.Unmarshal(lbPolicy, bc); err != nil {
return ClusterUpdate{}, fmt.Errorf("JSON generated from xDS LB policy registry: %s is invalid: %v", pretty.FormatJSON(lbPolicy), err)
}
}
@ -147,7 +144,6 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
SecurityCfg: sc,
MaxRequests: circuitBreakersFromCluster(cluster),
LBPolicy: lbPolicy,
LBPolicyJSON: lbCfgJSON,
OutlierDetection: od,
}

View File

@ -322,7 +322,7 @@ func (s) TestValidateClusterWithSecurityConfig_EnvVarOff(t *testing.T) {
if err != nil {
t.Errorf("validateClusterAndConstructClusterUpdate() failed: %v", err)
}
if diff := cmp.Diff(wantUpdate, gotUpdate, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" {
if diff := cmp.Diff(wantUpdate, gotUpdate, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" {
t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, got):\n%s", diff)
}
}
@ -1215,7 +1215,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) {
if (err != nil) != test.wantErr {
t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr)
}
if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmp.AllowUnexported(regexp.Regexp{}), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" {
if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmp.AllowUnexported(regexp.Regexp{}), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" {
t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, +got):\n%s", diff)
}
})
@ -1357,7 +1357,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
if name != test.wantName {
t.Errorf("unmarshalClusterResource(%s), got name: %s, want: %s", pretty.ToJSON(test.resource), name, test.wantName)
}
if diff := cmp.Diff(update, test.wantUpdate, cmpOpts, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" {
if diff := cmp.Diff(update, test.wantUpdate, cmpOpts, cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" {
t.Errorf("unmarshalClusterResource(%s), got unexpected update, diff (-got +want): %v", pretty.ToJSON(test.resource), diff)
}
})
@ -1507,7 +1507,7 @@ func (s) TestValidateClusterWithOutlierDetection(t *testing.T) {
if (err != nil) != test.wantErr {
t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr)
}
if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicyJSON")); diff != "" {
if diff := cmp.Diff(test.wantUpdate, update, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(ClusterUpdate{}, "LBPolicy")); diff != "" {
t.Errorf("validateClusterAndConstructClusterUpdate() returned unexpected diff (-want, +got):\n%s", diff)
}
})

View File

@ -141,6 +141,17 @@ func parseEDSRespProto(m *v3endpointpb.ClusterLoadAssignment) (EndpointsUpdate,
SubZone: l.SubZone,
}
lidStr, _ := lid.ToString()
// "Since an xDS configuration can place a given locality under multiple
// priorities, it is possible to see locality weight attributes with
// different values for the same locality." - A52
//
// This is handled in the client by emitting the locality weight
// specified for the priority it is specified in. If the same locality
// has a different weight in two priorities, each priority will specify
// a locality with the locality weight specified for that priority, and
// thus the subsequent tree of balancers linked to that priority will
// use that locality weight as well.
if localitiesWithPriority[lidStr] {
return EndpointsUpdate{}, fmt.Errorf("duplicate locality %s with the same priority %v", lidStr, priority)
}