xds: support picking ringhash in xds client and cds policy (#4657)

This commit is contained in:
Menghan Li 2021-08-12 11:12:02 -07:00 committed by GitHub
parent ad87ad0098
commit a42567fe92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 238 additions and 22 deletions

View File

@ -32,9 +32,11 @@ import (
"google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/pretty"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver" "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient"
) )
@ -333,6 +335,19 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
DiscoveryMechanisms: dms, DiscoveryMechanisms: dms,
} }
// lbPolicy is set only when the policy is ringhash. The default (when it's
// not set) is roundrobin. And similarly, we only need to set XDSLBPolicy
// for ringhash (it also defaults to roundrobin).
if lbp := update.lbPolicy; lbp != nil {
lbCfg.XDSLBPolicy = &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Config: &ringhash.LBConfig{
MinRingSize: lbp.MinimumRingSize,
MaxRingSize: lbp.MaximumRingSize,
},
}
}
ccState := balancer.ClientConnState{ ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient), ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg, BalancerConfig: lbCfg,

View File

@ -253,7 +253,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -309,7 +309,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// newChildBalancer function as part of test setup. No security config is // newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update. // passed to the CDS balancer as part of this update.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -465,7 +465,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be // create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -499,7 +499,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be // create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -552,7 +552,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be // create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -602,7 +602,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// create a new EDS balancer. The fake EDS balancer created above will be // create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
@ -680,7 +680,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
SubjectAltNameMatchers: testSANMatchers, SubjectAltNameMatchers: testSANMatchers,
}, },
} }
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {

View File

@ -32,10 +32,12 @@ import (
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver" "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
xdstestutils "google.golang.org/grpc/xds/internal/testutils" xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient"
@ -131,8 +133,8 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS
if xdsclient.FromResolverState(gotCCS.ResolverState) == nil { if xdsclient.FromResolverState(gotCCS.ResolverState) == nil {
return fmt.Errorf("want resolver state with XDSClient attached, got one without") return fmt.Errorf("want resolver state with XDSClient attached, got one without")
} }
if !cmp.Equal(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")) { if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")); diff != "" {
return fmt.Errorf("received ClientConnState: %+v, want %+v", gotCCS, wantCCS) return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff)
} }
return nil return nil
} }
@ -196,7 +198,7 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
// edsCCS is a helper function to construct a good update passed from the // edsCCS is a helper function to construct a good update passed from the
// cdsBalancer to the edsBalancer. // cdsBalancer to the edsBalancer.
func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientConnState { func edsCCS(service string, countMax *uint32, enableLRS bool, xdslbpolicy *internalserviceconfig.BalancerConfig) balancer.ClientConnState {
discoveryMechanism := clusterresolver.DiscoveryMechanism{ discoveryMechanism := clusterresolver.DiscoveryMechanism{
Type: clusterresolver.DiscoveryMechanismTypeEDS, Type: clusterresolver.DiscoveryMechanismTypeEDS,
Cluster: service, Cluster: service,
@ -208,6 +210,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientCon
} }
lbCfg := &clusterresolver.LBConfig{ lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism}, DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism},
XDSLBPolicy: xdslbpolicy,
} }
return balancer.ClientConnState{ return balancer.ClientConnState{
@ -361,12 +364,23 @@ func (s) TestHandleClusterUpdate(t *testing.T) {
{ {
name: "happy-case-with-lrs", name: "happy-case-with-lrs",
cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName, EnableLRS: true}, cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName, EnableLRS: true},
wantCCS: edsCCS(serviceName, nil, true), wantCCS: edsCCS(serviceName, nil, true, nil),
}, },
{ {
name: "happy-case-without-lrs", name: "happy-case-without-lrs",
cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName}, cdsUpdate: xdsclient.ClusterUpdate{ClusterName: serviceName},
wantCCS: edsCCS(serviceName, nil, false), wantCCS: edsCCS(serviceName, nil, false, nil),
},
{
name: "happy-case-with-ring-hash-lb-policy",
cdsUpdate: xdsclient.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: &xdsclient.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
wantCCS: edsCCS(serviceName, nil, false, &internalserviceconfig.BalancerConfig{
Name: ringhash.Name,
Config: &ringhash.LBConfig{MinRingSize: 10, MaxRingSize: 100},
}),
}, },
} }
@ -434,7 +448,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -519,7 +533,7 @@ func (s) TestResolverError(t *testing.T) {
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -568,7 +582,7 @@ func (s) TestUpdateSubConnState(t *testing.T) {
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -603,7 +617,7 @@ func (s) TestCircuitBreaking(t *testing.T) {
// the service's counter with the new max requests. // the service's counter with the new max requests.
var maxRequests uint32 = 1 var maxRequests uint32 = 1
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: clusterName, MaxRequests: &maxRequests}
wantCCS := edsCCS(clusterName, &maxRequests, false) wantCCS := edsCCS(clusterName, &maxRequests, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -636,7 +650,7 @@ func (s) TestClose(t *testing.T) {
// returned to the CDS balancer, because we have overridden the // returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup. // newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName} cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false) wantCCS := edsCCS(serviceName, nil, false, nil)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel() defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {

View File

@ -32,6 +32,14 @@ var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a
type clusterHandlerUpdate struct { type clusterHandlerUpdate struct {
// securityCfg is the Security Config from the top (root) cluster. // securityCfg is the Security Config from the top (root) cluster.
securityCfg *xdsclient.SecurityConfig securityCfg *xdsclient.SecurityConfig
// lbPolicy is the lb policy from the top (root) cluster.
//
// Currently, we only support roundrobin or ringhash, and since roundrobin
// does need configs, this is only set to the ringhash config, if the policy
// is ringhash. In the future, if we support more policies, we can make this
// an interface, and set it to config of the other policies.
lbPolicy *xdsclient.ClusterLBPolicyRingHash
// updates is a list of ClusterUpdates from all the leaf clusters. // updates is a list of ClusterUpdates from all the leaf clusters.
updates []xdsclient.ClusterUpdate updates []xdsclient.ClusterUpdate
err error err error
@ -101,6 +109,7 @@ func (ch *clusterHandler) constructClusterUpdate() {
} }
ch.updateChannel <- clusterHandlerUpdate{ ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.root.clusterUpdate.SecurityCfg, securityCfg: ch.root.clusterUpdate.SecurityCfg,
lbPolicy: ch.root.clusterUpdate.LBPolicy,
updates: clusterUpdate, updates: clusterUpdate,
} }
} }

View File

@ -53,20 +53,34 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
name string name string
clusterName string clusterName string
clusterUpdate xdsclient.ClusterUpdate clusterUpdate xdsclient.ClusterUpdate
lbPolicy *xdsclient.ClusterLBPolicyRingHash
}{ }{
{name: "test-update-root-cluster-EDS-success", {
name: "test-update-root-cluster-EDS-success",
clusterName: edsService, clusterName: edsService,
clusterUpdate: xdsclient.ClusterUpdate{ clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeEDS, ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService, ClusterName: edsService,
}}, },
},
{
name: "test-update-root-cluster-EDS-with-ring-hash",
clusterName: logicalDNSService,
clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS,
ClusterName: logicalDNSService,
LBPolicy: &xdsclient.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
lbPolicy: &xdsclient.ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
{ {
name: "test-update-root-cluster-Logical-DNS-success", name: "test-update-root-cluster-Logical-DNS-success",
clusterName: logicalDNSService, clusterName: logicalDNSService,
clusterUpdate: xdsclient.ClusterUpdate{ clusterUpdate: xdsclient.ClusterUpdate{
ClusterType: xdsclient.ClusterTypeLogicalDNS, ClusterType: xdsclient.ClusterTypeLogicalDNS,
ClusterName: logicalDNSService, ClusterName: logicalDNSService,
}}, },
},
} }
for _, test := range tests { for _, test := range tests {
@ -98,6 +112,9 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" { if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff) t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
} }
if diff := cmp.Diff(chu.lbPolicy, test.lbPolicy); diff != "" {
t.Fatalf("got unexpected lb policy in cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done(): case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.") t.Fatal("Timed out waiting for update from update channel.")
} }

View File

@ -108,7 +108,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantErr: true, wantErr: true,
}, },
{ {
name: "non-round-robin-lb-policy", name: "non-round-robin-or-ring-hash-lb-policy",
cluster: &v3clusterpb.Cluster{ cluster: &v3clusterpb.Cluster{
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
@ -140,6 +140,59 @@ func (s) TestValidateCluster_Failure(t *testing.T) {
wantUpdate: emptyUpdate, wantUpdate: emptyUpdate,
wantErr: true, wantErr: true,
}, },
{
name: "ring-hash-hash-function-not-xx-hash",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_RingHashLbConfig_{
RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
HashFunction: v3clusterpb.Cluster_RingHashLbConfig_MURMUR_HASH_2,
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-min-bound-greater-than-max",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_RingHashLbConfig_{
RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
MinimumRingSize: wrapperspb.UInt64(100),
MaximumRingSize: wrapperspb.UInt64(10),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-min-bound-greater-than-upper-bound",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_RingHashLbConfig_{
RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
MinimumRingSize: wrapperspb.UInt64(ringHashSizeUpperBound + 1),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "ring-hash-max-bound-greater-than-upper-bound",
cluster: &v3clusterpb.Cluster{
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_RingHashLbConfig_{
RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
MaximumRingSize: wrapperspb.UInt64(ringHashSizeUpperBound + 1),
},
},
},
wantUpdate: emptyUpdate,
wantErr: true,
},
} }
oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv
@ -301,6 +354,62 @@ func (s) TestValidateCluster_Success(t *testing.T) {
}, },
wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, wantUpdate: ClusterUpdate{ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()},
}, },
{
name: "happiest-case-with-ring-hash-lb-policy-with-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LrsServer: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
},
},
wantUpdate: ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true,
LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: defaultRingHashMinSize, MaximumRingSize: defaultRingHashMaxSize},
},
},
{
name: "happiest-case-with-ring-hash-lb-policy-with-none-default-config",
cluster: &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName,
},
LbPolicy: v3clusterpb.Cluster_RING_HASH,
LbConfig: &v3clusterpb.Cluster_RingHashLbConfig_{
RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
MinimumRingSize: wrapperspb.UInt64(10),
MaximumRingSize: wrapperspb.UInt64(100),
},
},
LrsServer: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
},
},
wantUpdate: ClusterUpdate{
ClusterName: clusterName, EDSServiceName: serviceName, EnableLRS: true,
LBPolicy: &ClusterLBPolicyRingHash{MinimumRingSize: 10, MaximumRingSize: 100},
},
},
} }
oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv oldAggregateAndDNSSupportEnv := env.AggregateAndDNSSupportEnv

View File

@ -421,6 +421,13 @@ const (
ClusterTypeAggregate ClusterTypeAggregate
) )
// ClusterLBPolicyRingHash represents ring_hash lb policy, and also contains its
// config.
type ClusterLBPolicyRingHash struct {
MinimumRingSize uint64
MaximumRingSize uint64
}
// ClusterUpdate contains information from a received CDS response, which is of // ClusterUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher. // interest to the registered CDS watcher.
type ClusterUpdate struct { type ClusterUpdate struct {
@ -443,6 +450,16 @@ type ClusterUpdate struct {
// a prioritized list of cluster names. // a prioritized list of cluster names.
PrioritizedClusterNames []string PrioritizedClusterNames []string
// LBPolicy is the lb policy for this cluster.
//
// This only support round_robin and ring_hash.
// - if it's nil, the lb policy is round_robin
// - if it's not nil, the lb policy is ring_hash, the this field has the config.
//
// When we add more support policies, this can be made an interface, and
// will be set to different types based on the policy type.
LBPolicy *ClusterLBPolicyRingHash
// Raw is the resource from the xds response. // Raw is the resource from the xds response.
Raw *anypb.Any Raw *anypb.Any
} }

View File

@ -574,8 +574,42 @@ func unmarshalClusterResource(r *anypb.Any, logger *grpclog.PrefixLogger) (strin
return cluster.GetName(), cu, nil return cluster.GetName(), cu, nil
} }
const (
defaultRingHashMinSize = 1024
defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M
ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M
)
func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) {
if cluster.GetLbPolicy() != v3clusterpb.Cluster_ROUND_ROBIN { var lbPolicy *ClusterLBPolicyRingHash
switch cluster.GetLbPolicy() {
case v3clusterpb.Cluster_ROUND_ROBIN:
lbPolicy = nil // The default is round_robin, and there's no config to set.
case v3clusterpb.Cluster_RING_HASH:
rhc := cluster.GetRingHashLbConfig()
if rhc.GetHashFunction() != v3clusterpb.Cluster_RingHashLbConfig_XX_HASH {
return ClusterUpdate{}, fmt.Errorf("unsupported ring_hash hash function %v in response: %+v", rhc.GetHashFunction(), cluster)
}
// Minimum defaults to 1024 entries, and limited to 8M entries Maximum
// defaults to 8M entries, and limited to 8M entries
var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize
if min := rhc.GetMinimumRingSize(); min != nil {
if min.GetValue() > ringHashSizeUpperBound {
return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash mininum ring size %v in response: %+v", min.GetValue(), cluster)
}
minSize = min.GetValue()
}
if max := rhc.GetMaximumRingSize(); max != nil {
if max.GetValue() > ringHashSizeUpperBound {
return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash maxinum ring size %v in response: %+v", max.GetValue(), cluster)
}
maxSize = max.GetValue()
}
if minSize > maxSize {
return ClusterUpdate{}, fmt.Errorf("ring_hash config min size %v is greater than max %v", minSize, maxSize)
}
lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize}
default:
return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
} }
@ -594,6 +628,7 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
EnableLRS: cluster.GetLrsServer().GetSelf() != nil, EnableLRS: cluster.GetLrsServer().GetSelf() != nil,
SecurityCfg: sc, SecurityCfg: sc,
MaxRequests: circuitBreakersFromCluster(cluster), MaxRequests: circuitBreakersFromCluster(cluster),
LBPolicy: lbPolicy,
} }
// Validate and set cluster type from the response. // Validate and set cluster type from the response.