xds: support cluster fallback in cluster_resolver (#4594)

This commit is contained in:
Menghan Li 2021-07-21 10:22:02 -07:00 committed by GitHub
parent 65cabd74d8
commit 0300770df1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 589 additions and 724 deletions

View File

@ -35,27 +35,27 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/xdsclient"
)
const (
cdsName = "cds_experimental"
edsName = "eds_experimental"
)
var (
errBalancerClosed = errors.New("cdsBalancer is closed")
// newEDSBalancer is a helper function to build a new edsBalancer and will be
// overridden in unittests.
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(edsName)
// newChildBalancer is a helper function to build a new cluster_resolver
// balancer and will be overridden in unittests.
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
builder := balancer.Get(clusterresolver.Name)
if builder == nil {
return nil, fmt.Errorf("xds: no balancer builder with name %v", edsName)
return nil, fmt.Errorf("xds: no balancer builder with name %v", clusterresolver.Name)
}
// We directly pass the parent clientConn to the
// underlying edsBalancer because the cdsBalancer does
// not deal with subConns.
// We directly pass the parent clientConn to the underlying
// cluster_resolver balancer because the cdsBalancer does not deal with
// subConns.
return builder.Build(cc, opts), nil
}
buildProvider = buildProviderFunc
@ -126,31 +126,32 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
// ccUpdate wraps a clientConn update received from gRPC (pushed from the
// xdsResolver). A valid clusterName causes the cdsBalancer to register a CDS
// watcher with the xdsClient, while a non-nil error causes it to cancel the
// existing watch and propagate the error to the underlying edsBalancer.
// existing watch and propagate the error to the underlying cluster_resolver
// balancer.
type ccUpdate struct {
clusterName string
err error
}
// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the edsBalancer.
// on to the cluster_resolver balancer.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}
// cdsBalancer implements a CDS based LB policy. It instantiates an EDS based
// LB policy to further resolve the serviceName received from CDS, into
// localities and endpoints. Implements the balancer.Balancer interface which
// is exposed to gRPC and implements the balancer.ClientConn interface which is
// exposed to the edsBalancer.
// cdsBalancer implements a CDS based LB policy. It instantiates a
// cluster_resolver balancer to further resolve the serviceName received from
// CDS, into localities and endpoints. Implements the balancer.Balancer
// interface which is exposed to gRPC and implements the balancer.ClientConn
// interface which is exposed to the cluster_resolver balancer.
type cdsBalancer struct {
ccw *ccWrapper // ClientConn interface passed to child LB.
bOpts balancer.BuildOptions // BuildOptions passed to child LB.
updateCh *buffer.Unbounded // Channel for gRPC and xdsClient updates.
xdsClient xdsclient.XDSClient // xDS client to watch Cluster resource.
clusterHandler *clusterHandler // To watch the clusters.
edsLB balancer.Balancer // EDS child policy.
childLB balancer.Balancer
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
@ -166,7 +167,7 @@ type cdsBalancer struct {
// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of a CDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// edsBalancer.
// cluster_resolver balancer.
func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
// update, only if the status quo has changed.
@ -266,7 +267,7 @@ func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanc
}
// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying edsBalancer.
// lead to clientConn updates being invoked on the underlying cluster_resolver balancer.
func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
@ -274,7 +275,7 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
return
}
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.chu), pretty.ToJSON(update.securityCfg))
b.logger.Infof("Watch update from xds-client %p, content: %+v, security config: %v", b.xdsClient, pretty.ToJSON(update.updates), pretty.ToJSON(update.securityCfg))
// Process the security config from the received update before building the
// child policy or forwarding the update to it. We do this because the child
@ -291,47 +292,54 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) {
}
// The first good update from the watch API leads to the instantiation of an
// edsBalancer. Further updates/errors are propagated to the existing
// edsBalancer.
if b.edsLB == nil {
edsLB, err := newEDSBalancer(b.ccw, b.bOpts)
// cluster_resolver balancer. Further updates/errors are propagated to the existing
// cluster_resolver balancer.
if b.childLB == nil {
childLB, err := newChildBalancer(b.ccw, b.bOpts)
if err != nil {
b.logger.Errorf("Failed to create child policy of type %s, %v", edsName, err)
b.logger.Errorf("Failed to create child policy of type %s, %v", clusterresolver.Name, err)
return
}
b.edsLB = edsLB
b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName)
b.childLB = childLB
b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name)
}
if len(update.chu) == 0 {
b.logger.Infof("got update with 0 cluster updates, should never happen. There should be at least one cluster")
}
// TODO: this function is currently only handling the cluster with higher
// priority. This should work in most cases (e.g. if the cluster is not a
// aggregated cluster, or if the higher priority cluster works fine so
// there's no need to fallback). This quick fix is to unblock the testing
// work before the full fallback support is complete. Once the EDS balancer
// is updated to cluster_resolver, which has the fallback functionality, we
// will fix this to handle all the clusters in list.
cds := update.chu[0]
lbCfg := &clusterresolver.EDSConfig{
ClusterName: cds.ClusterName,
EDSServiceName: cds.EDSServiceName,
MaxConcurrentRequests: cds.MaxRequests,
}
if cds.EnableLRS {
// An empty string here indicates that the edsBalancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
lbCfg.LrsLoadReportingServerName = new(string)
dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates))
for i, cu := range update.updates {
switch cu.ClusterType {
case xdsclient.ClusterTypeEDS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: cu.ClusterName,
EDSServiceName: cu.EDSServiceName,
MaxConcurrentRequests: cu.MaxRequests,
}
if cu.EnableLRS {
// An empty string here indicates that the cluster_resolver balancer should use the
// same xDS server for load reporting as it does for EDS
// requests/responses.
dms[i].LoadReportingServerName = new(string)
}
case xdsclient.ClusterTypeLogicalDNS:
dms[i] = balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: cu.DNSHostName,
}
default:
b.logger.Infof("unexpected cluster type %v when handling update from cluster handler", cu.ClusterType)
}
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: dms,
}
ccState := balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, b.xdsClient),
BalancerConfig: lbCfg,
}
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
if err := b.childLB.UpdateClientConnState(ccState); err != nil {
b.logger.Errorf("xds: cluster_resolver balancer.UpdateClientConnState(%+v) returned error: %v", ccState, err)
}
}
@ -348,20 +356,20 @@ func (b *cdsBalancer) run() {
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are passthrough and are simply handed over to
// the underlying edsBalancer.
if b.edsLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no edsBalancer", update)
// the underlying cluster_resolver balancer.
if b.childLB == nil {
b.logger.Errorf("xds: received scUpdate {%+v} with no cluster_resolver balancer", update)
break
}
b.edsLB.UpdateSubConnState(update.subConn, update.state)
b.childLB.UpdateSubConnState(update.subConn, update.state)
}
case u := <-b.clusterHandler.updateChannel:
b.handleWatchUpdate(u)
case <-b.closed.Done():
b.clusterHandler.close()
if b.edsLB != nil {
b.edsLB.Close()
b.edsLB = nil
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
}
if b.cachedRoot != nil {
b.cachedRoot.Close()
@ -389,22 +397,22 @@ func (b *cdsBalancer) run() {
// - If it's from xds client, it means CDS resource were removed. The CDS
// watcher should keep watching.
//
// In both cases, the error will be forwarded to EDS balancer. And if error is
// resource-not-found, the child EDS balancer will stop watching EDS.
// In both cases, the error will be forwarded to the child balancer. And if
// error is resource-not-found, the child balancer will stop watching EDS.
func (b *cdsBalancer) handleErrorFromUpdate(err error, fromParent bool) {
// This is not necessary today, because xds client never sends connection
// errors.
if fromParent && xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
b.clusterHandler.close()
}
if b.edsLB != nil {
if b.childLB != nil {
if xdsclient.ErrType(err) != xdsclient.ErrorTypeConnection {
// Connection errors will be sent to the child balancers directly.
// There's no need to forward them.
b.edsLB.ResolverError(err)
b.childLB.ResolverError(err)
}
} else {
// If eds balancer was never created, fail the RPCs with
// If child balancer was never created, fail the RPCs with
// errors.
b.ccw.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,

View File

@ -153,8 +153,8 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
// Override the creation of the EDS balancer to return a fake EDS balancer
// implementation.
edsB := newTestEDSBalancer()
oldEDSBalancerBuilder := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
oldEDSBalancerBuilder := newChildBalancer
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
edsB.parentCC = cc
return edsB, nil
}
@ -177,7 +177,7 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
}
return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}
@ -251,7 +251,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -306,7 +306,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup. No security config is
// newChildBalancer function as part of test setup. No security config is
// passed to the CDS balancer as part of this update.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
@ -464,7 +464,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdateWithGoodSecurityCfg, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
@ -498,7 +498,7 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
@ -551,7 +551,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
@ -601,7 +601,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
@ -672,7 +672,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{
ClusterName: serviceName,
SecurityCfg: &xdsclient.SecurityConfig{

View File

@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
@ -197,20 +198,26 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState {
// edsCCS is a helper function to construct a good update passed from the
// cdsBalancer to the edsBalancer.
func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientConnState {
lbCfg := &clusterresolver.EDSConfig{
ClusterName: service,
discoveryMechanism := balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: service,
MaxConcurrentRequests: countMax,
}
if enableLRS {
lbCfg.LrsLoadReportingServerName = new(string)
discoveryMechanism.LoadReportingServerName = new(string)
}
lbCfg := &clusterresolver.LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{discoveryMechanism},
}
return balancer.ClientConnState{
BalancerConfig: lbCfg,
}
}
// setup creates a cdsBalancer and an edsBalancer (and overrides the
// newEDSBalancer function to return it), and also returns a cleanup function.
// newChildBalancer function to return it), and also returns a cleanup function.
func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) {
t.Helper()
xdsC := fakeclient.NewClient()
@ -222,14 +229,14 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *x
cdsB := builder.Build(tcc, balancer.BuildOptions{})
edsB := newTestEDSBalancer()
oldEDSBalancerBuilder := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
oldEDSBalancerBuilder := newChildBalancer
newChildBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) (balancer.Balancer, error) {
edsB.parentCC = cc
return edsB, nil
}
return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newEDSBalancer = oldEDSBalancerBuilder
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}
@ -426,7 +433,7 @@ func (s) TestHandleClusterUpdateError(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -511,7 +518,7 @@ func (s) TestResolverError(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
@ -560,7 +567,7 @@ func (s) TestUpdateSubConnState(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -628,7 +635,7 @@ func (s) TestClose(t *testing.T) {
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newEDSBalancer function as part of test setup.
// newChildBalancer function as part of test setup.
cdsUpdate := xdsclient.ClusterUpdate{ClusterName: serviceName}
wantCCS := edsCCS(serviceName, nil, false)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)

View File

@ -32,9 +32,9 @@ var errNotReceivedUpdate = errors.New("tried to construct a cluster update on a
type clusterHandlerUpdate struct {
// securityCfg is the Security Config from the top (root) cluster.
securityCfg *xdsclient.SecurityConfig
// chu is a list of ClusterUpdates from all the leaf clusters.
chu []xdsclient.ClusterUpdate
err error
// updates is a list of ClusterUpdates from all the leaf clusters.
updates []xdsclient.ClusterUpdate
err error
}
// clusterHandler will be given a name representing a cluster. It will then
@ -101,7 +101,7 @@ func (ch *clusterHandler) constructClusterUpdate() {
}
ch.updateChannel <- clusterHandlerUpdate{
securityCfg: ch.root.clusterUpdate.SecurityCfg,
chu: clusterUpdate,
updates: clusterUpdate,
}
}

View File

@ -95,7 +95,7 @@ func (s) TestSuccessCaseLeafNode(t *testing.T) {
fakeClient.InvokeWatchClusterCallback(test.clusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.clusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
@ -189,7 +189,7 @@ func (s) TestSuccessCaseLeafNodeThenNewUpdate(t *testing.T) {
fakeClient.InvokeWatchClusterCallback(test.newClusterUpdate, nil)
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" {
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{test.newClusterUpdate}); diff != "" {
t.Fatalf("got unexpected cluster update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
@ -305,7 +305,7 @@ func (s) TestUpdateRootClusterAggregateSuccess(t *testing.T) {
// ordered as per the cluster update.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService,
}, {
@ -412,7 +412,7 @@ func (s) TestUpdateRootClusterAggregateThenChangeChild(t *testing.T) {
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService,
}, {
@ -658,7 +658,7 @@ func (s) TestSwitchClusterNodeBetweenLeafAndAggregated(t *testing.T) {
// Then an update should successfully be written to the update buffer.
select {
case chu := <-ch.updateChannel:
if diff := cmp.Diff(chu.chu, []xdsclient.ClusterUpdate{{
if diff := cmp.Diff(chu.updates, []xdsclient.ClusterUpdate{{
ClusterType: xdsclient.ClusterTypeEDS,
ClusterName: edsService2,
}}); diff != "" {

View File

@ -34,12 +34,13 @@ import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
)
// Name is the name of the cluster_resolver balancer.
const Name = "eds_experimental"
const Name = "cluster_resolver_experimental"
var (
errBalancerClosed = errors.New("cdsBalancer is closed")
@ -68,7 +69,6 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
}
b := &clusterResolverBalancer{
cc: cc,
bOpts: opts,
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
@ -79,9 +79,11 @@ func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Bal
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
b.edsWatcher = &edsWatcher{
parent: b,
updateChannel: make(chan *watchUpdate, 1),
b.resourceWatcher = newResourceResolver(b)
b.cc = &ccWrapper{
ClientConn: cc,
resourceWatcher: b.resourceWatcher,
}
go b.run()
@ -93,9 +95,9 @@ func (bb) Name() string {
}
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg EDSConfig
var cfg LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, fmt.Errorf("unable to unmarshal balancer config %s into EDSConfig, error: %v", string(c), err)
return nil, fmt.Errorf("unable to unmarshal balancer config %s into cluster-resolver config, error: %v", string(c), err)
}
return &cfg, nil
}
@ -114,33 +116,35 @@ type scUpdate struct {
state balancer.SubConnState
}
// clusterResolverBalancer manages xdsClient and the actual EDS balancer
// implementation that does load balancing.
// clusterResolverBalancer manages xdsClient and the actual EDS balancer implementation that
// does load balancing.
//
// It currently has only an clusterResolverBalancer. Later, we may add fallback.
type clusterResolverBalancer struct {
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded // Channel for updates from gRPC.
edsWatcher *edsWatcher
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
cc balancer.ClientConn
bOpts balancer.BuildOptions
updateCh *buffer.Unbounded // Channel for updates from gRPC.
resourceWatcher *resourceResolver
logger *grpclog.PrefixLogger
closed *grpcsync.Event
done *grpcsync.Event
priorityBuilder balancer.Builder
priorityConfigParser balancer.ConfigParser
config *EDSConfig
config *LBConfig
configRaw *serviceconfig.ParseResult
xdsClient xdsclient.XDSClient // xDS client to watch EDS resource.
attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies.
child balancer.Balancer
edsResp xdsclient.EndpointsUpdate
priorities []balancerconfig.PriorityConfig
watchUpdateReceived bool
}
// handleClientConnUpdate handles a ClientConnUpdate received from gRPC. Good
// updates lead to registration of an EDS watch. Updates with error lead to
// cancellation of existing watch and propagation of the same error to the
// updates lead to registration of EDS and DNS watches. Updates with error lead
// to cancellation of existing watch and propagation of the same error to the
// child balancer.
func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
// We first handle errors, if any, and then proceed with handling the
@ -151,7 +155,7 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
}
b.logger.Infof("Receive update from resolver, balancer config: %v", pretty.ToJSON(update.state.BalancerConfig))
cfg, _ := update.state.BalancerConfig.(*EDSConfig)
cfg, _ := update.state.BalancerConfig.(*LBConfig)
if cfg == nil {
b.logger.Warningf("xds: unexpected LoadBalancingConfig type: %T", update.state.BalancerConfig)
return
@ -159,7 +163,7 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
b.config = cfg
b.configRaw = update.state.ResolverState.ServiceConfig
b.edsWatcher.updateConfig(cfg)
b.resourceWatcher.updateMechanisms(cfg.DiscoveryMechanisms)
if !b.watchUpdateReceived {
// If update was not received, wait for it.
@ -175,16 +179,16 @@ func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
// handleWatchUpdate handles a watch update from the xDS Client. Good updates
// lead to clientConn updates being invoked on the underlying child balancer.
func (b *clusterResolverBalancer) handleWatchUpdate(update *watchUpdate) {
func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) {
if err := update.err; err != nil {
b.logger.Warningf("Watch error from xds-client %p: %v", b.xdsClient, err)
b.handleErrorFromUpdate(err, false)
return
}
b.logger.Infof("resource update: %+v", pretty.ToJSON(update.eds))
b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities))
b.watchUpdateReceived = true
b.edsResp = update.eds
b.priorities = update.priorities
// A new EDS update triggers new child configs (e.g. different priorities
// for the priority balancer), and new addresses (the endpoints come from
@ -206,7 +210,7 @@ func (b *clusterResolverBalancer) updateChildConfig() error {
b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
}
childCfgBytes, addrs, err := buildPriorityConfigJSON(b.edsResp, b.config)
childCfgBytes, addrs, err := balancerconfig.BuildPriorityConfigJSON(b.priorities, b.config.EndpointPickingPolicy)
if err != nil {
return fmt.Errorf("failed to build priority balancer config: %v", err)
}
@ -243,7 +247,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo
// This is an error from the parent ClientConn (can be the parent CDS
// balancer), and is a resource-not-found error. This means the resource
// (can be either LDS or CDS) was removed. Stop the EDS watch.
b.edsWatcher.stopWatch()
b.resourceWatcher.stop()
}
if b.child != nil {
b.child.ResolverError(err)
@ -277,13 +281,13 @@ func (b *clusterResolverBalancer) run() {
}
b.child.UpdateSubConnState(update.subConn, update.state)
}
case u := <-b.edsWatcher.updateChannel:
case u := <-b.resourceWatcher.updateChannel:
b.handleWatchUpdate(u)
// Close results in cancellation of the EDS watch and closing of the
// underlying child policy and is the only way to exit this goroutine.
case <-b.closed.Done():
b.edsWatcher.stopWatch()
b.resourceWatcher.stop()
if b.child != nil {
b.child.Close()
@ -344,3 +348,14 @@ func (b *clusterResolverBalancer) Close() {
b.closed.Fire()
<-b.done.Done()
}
// ccWrapper overrides ResolveNow(), so that re-resolution from the child
// policies will trigger the DNS resolver in cluster_resolver balancer.
type ccWrapper struct {
balancer.ClientConn
resourceWatcher *resourceResolver
}
func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) {
c.resourceWatcher.resolveNow()
}

View File

@ -21,24 +21,19 @@
package clusterresolver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"testing"
"time"
"github.com/golang/protobuf/jsonpb"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
scpb "google.golang.org/grpc/internal/proto/grpc_service_config"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
@ -48,8 +43,8 @@ import (
const (
defaultTestTimeout = 1 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
testServiceName = "test/foo"
testClusterName = "test/cluster"
testEDSServcie = "test-eds-service-name"
testClusterName = "test-cluster-name"
)
var (
@ -105,7 +100,7 @@ func (t *noopTestClientConn) NewSubConn([]resolver.Address, balancer.NewSubConnO
return nil, nil
}
func (noopTestClientConn) Target() string { return testServiceName }
func (noopTestClientConn) Target() string { return testEDSServcie }
type scStateChange struct {
sc balancer.SubConn
@ -213,7 +208,7 @@ func (s) TestSubConnStateChange(t *testing.T) {
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
@ -221,7 +216,7 @@ func (s) TestSubConnStateChange(t *testing.T) {
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatalf("edsB.UpdateClientConnState() failed: %v", err)
}
@ -259,7 +254,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
@ -269,7 +264,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
@ -323,7 +318,7 @@ func (s) TestErrorFromXDSClientUpdate(t *testing.T) {
// An update with the same service name should not trigger a new watch.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
@ -347,7 +342,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
@ -357,7 +352,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
defer cancel()
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
@ -408,7 +403,7 @@ func (s) TestErrorFromResolver(t *testing.T) {
// the previous watch was canceled.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: testServiceName},
BalancerConfig: newLBConfigWithOneEDS(testEDSServcie),
}); err != nil {
t.Fatal(err)
}
@ -449,7 +444,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
defer cleanup()
builder := balancer.Get(Name)
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsB == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
@ -460,7 +455,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// If eds service name is not set, should watch for cluster name.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{ClusterName: "cluster-1"},
BalancerConfig: newLBConfigWithOneEDS("cluster-1"),
}); err != nil {
t.Fatal(err)
}
@ -472,7 +467,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// the same.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"},
BalancerConfig: newLBConfigWithOneEDS("foobar-1"),
}); err != nil {
t.Fatal(err)
}
@ -486,7 +481,7 @@ func (s) TestClientWatchEDS(t *testing.T) {
// with no resource names being sent to the server.
if err := edsB.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{EDSServiceName: "foobar-2"},
BalancerConfig: newLBConfigWithOneEDS("foobar-2"),
}); err != nil {
t.Fatal(err)
}
@ -495,184 +490,12 @@ func (s) TestClientWatchEDS(t *testing.T) {
}
}
const (
fakeBalancerA = "fake_balancer_A"
fakeBalancerB = "fake_balancer_B"
)
// Install two fake balancers for service config update tests.
//
// ParseConfig only accepts the json if the balancer specified is registered.
func init() {
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerA})
balancer.Register(&fakeBalancerBuilder{name: fakeBalancerB})
}
type fakeBalancerBuilder struct {
name string
}
func (b *fakeBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &fakeBalancer{cc: cc}
}
func (b *fakeBalancerBuilder) Name() string {
return b.name
}
type fakeBalancer struct {
cc balancer.ClientConn
}
func (b *fakeBalancer) ResolverError(error) {
panic("implement me")
}
func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
panic("implement me")
}
func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("implement me")
}
func (b *fakeBalancer) Close() {}
func (s) TestBalancerConfigParsing(t *testing.T) {
const testEDSName = "eds.service"
var testLRSName = "lrs.server"
b := bytes.NewBuffer(nil)
if err := (&jsonpb.Marshaler{}).Marshal(b, &scpb.XdsConfig{
ChildPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_RoundRobin{
RoundRobin: &scpb.RoundRobinConfig{},
}},
},
FallbackPolicy: []*scpb.LoadBalancingConfig{
{Policy: &scpb.LoadBalancingConfig_Xds{}},
{Policy: &scpb.LoadBalancingConfig_PickFirst{
PickFirst: &scpb.PickFirstConfig{},
}},
},
EdsServiceName: testEDSName,
LrsLoadReportingServerName: &wrapperspb.StringValue{Value: testLRSName},
}); err != nil {
t.Fatalf("%v", err)
}
var testMaxConcurrentRequests uint32 = 123
tests := []struct {
name string
js json.RawMessage
want serviceconfig.LoadBalancingConfig
wantErr bool
}{
{
name: "bad json",
js: json.RawMessage(`i am not JSON`),
wantErr: true,
},
{
name: "empty",
js: json.RawMessage(`{}`),
want: &EDSConfig{},
},
{
name: "jsonpb-generated",
js: b.Bytes(),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
LrsLoadReportingServerName: &testLRSName,
},
},
{
// json with random balancers, and the first is not registered.
name: "manually-generated",
js: json.RawMessage(`
{
"childPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_A": {}},
{"fake_balancer_B": {}}
],
"fallbackPolicy": [
{"fake_balancer_C": {}},
{"fake_balancer_B": {}},
{"fake_balancer_A": {}}
],
"edsServiceName": "eds.service",
"maxConcurrentRequests": 123,
"lrsLoadReportingServerName": "lrs.server"
}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "fake_balancer_A",
Config: json.RawMessage("{}"),
},
FallBackPolicy: &loadBalancingConfig{
Name: "fake_balancer_B",
Config: json.RawMessage("{}"),
},
EDSServiceName: testEDSName,
MaxConcurrentRequests: &testMaxConcurrentRequests,
LrsLoadReportingServerName: &testLRSName,
},
},
{
// json with no lrs server name, LoadReportingServerName should
// be nil (not an empty string).
name: "no-lrs-server-name",
js: json.RawMessage(`
{
"edsServiceName": "eds.service"
}`),
want: &EDSConfig{
EDSServiceName: testEDSName,
LrsLoadReportingServerName: nil,
},
},
{
name: "good child policy",
js: json.RawMessage(`{"childPolicy":[{"pick_first":{}}]}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "pick_first",
Config: json.RawMessage(`{}`),
},
},
},
{
name: "multiple good child policies",
js: json.RawMessage(`{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}`),
want: &EDSConfig{
ChildPolicy: &loadBalancingConfig{
Name: "round_robin",
Config: json.RawMessage(`{}`),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := bb{}.ParseConfig(tt.js)
if (err != nil) != tt.wantErr {
t.Fatalf("edsBalancerBuilder.ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.wantErr {
return
}
if !cmp.Equal(got, tt.want) {
t.Errorf(cmp.Diff(got, tt.want))
}
})
func newLBConfigWithOneEDS(edsServiceName string) *LBConfig {
return &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: edsServiceName,
}},
}
}

View File

@ -1,6 +1,6 @@
/*
*
* Copyright 2019 gRPC authors.
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,110 +19,44 @@ package clusterresolver
import (
"encoding/json"
"fmt"
"google.golang.org/grpc/balancer"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
)
// EDSConfig represents the loadBalancingConfig section of the service config
// for EDS balancers.
type EDSConfig struct {
serviceconfig.LoadBalancingConfig
// ChildPolicy represents the load balancing config for the child
// policy.
ChildPolicy *loadBalancingConfig
// FallBackPolicy represents the load balancing config for the
// fallback.
FallBackPolicy *loadBalancingConfig
// ClusterName is the cluster name.
ClusterName string
// EDSServiceName is the name to use in EDS query. If not set, use
// ClusterName.
EDSServiceName string
// MaxConcurrentRequests is the max number of concurrent request allowed for
// this service. If unset, default value 1024 is used.
// LBConfig is the config for cluster resolver balancer.
type LBConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
// DiscoveryMechanisms is an ordered list of discovery mechanisms.
//
// Note that this is not defined in the service config proto. And the reason
// is, we are dropping EDS and moving the features into cluster_impl. But in
// the mean time, to keep things working, we need to add this field. And it
// should be fine to add this extra field here, because EDS is only used in
// CDS today, so we have full control.
MaxConcurrentRequests *uint32
// LRS server to send load reports to. If not present, load reporting
// will be disabled. If set to the empty string, load reporting will
// be sent to the same server that we obtained CDS data from.
LrsLoadReportingServerName *string
// Must have at least one element. Results from each discovery mechanism are
// concatenated together in successive priorities.
DiscoveryMechanisms []balancerconfig.DiscoveryMechanism `json:"discoveryMechanisms,omitempty"`
// LocalityPickingPolicy is policy for locality picking.
//
// This policy's config is expected to be in the format used by the
// weighted_target policy. Note that the config should include an empty
// value for the "targets" field; that empty value will be replaced by one
// that is dynamically generated based on the EDS data. Optional; defaults
// to "weighted_target".
LocalityPickingPolicy *internalserviceconfig.BalancerConfig `json:"localityPickingPolicy,omitempty"`
// EndpointPickingPolicy is policy for endpoint picking.
//
// This will be configured as the policy for each child in the
// locality-policy's config. Optional; defaults to "round_robin".
EndpointPickingPolicy *internalserviceconfig.BalancerConfig `json:"endpointPickingPolicy,omitempty"`
// TODO: read and warn if endpoint is not roundrobin or locality is not
// weightedtarget.
}
// edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy
// and Fallbackspolicy are post-processed, and for each, the first installed
// policy is kept.
type edsConfigJSON struct {
ChildPolicy []*loadBalancingConfig
FallbackPolicy []*loadBalancingConfig
ClusterName string
EDSServiceName string
MaxConcurrentRequests *uint32
LRSLoadReportingServerName *string
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
// When unmarshalling, we iterate through the childPolicy/fallbackPolicy lists
// and select the first LB policy which has been registered.
func (l *EDSConfig) UnmarshalJSON(data []byte) error {
var configJSON edsConfigJSON
if err := json.Unmarshal(data, &configJSON); err != nil {
return err
func parseConfig(c json.RawMessage) (*LBConfig, error) {
var cfg LBConfig
if err := json.Unmarshal(c, &cfg); err != nil {
return nil, err
}
l.ClusterName = configJSON.ClusterName
l.EDSServiceName = configJSON.EDSServiceName
l.MaxConcurrentRequests = configJSON.MaxConcurrentRequests
l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName
for _, lbcfg := range configJSON.ChildPolicy {
if balancer.Get(lbcfg.Name) != nil {
l.ChildPolicy = lbcfg
break
}
}
for _, lbcfg := range configJSON.FallbackPolicy {
if balancer.Get(lbcfg.Name) != nil {
l.FallBackPolicy = lbcfg
break
}
}
return nil
}
// MarshalJSON returns a JSON encoding of l.
func (l *EDSConfig) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("EDSConfig.MarshalJSON() is unimplemented")
}
// loadBalancingConfig represents a single load balancing config,
// stored in JSON format.
type loadBalancingConfig struct {
Name string
Config json.RawMessage
}
// MarshalJSON returns a JSON encoding of l.
func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
return nil, fmt.Errorf("loadBalancingConfig.MarshalJSON() is unimplemented")
}
// UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l.
func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
var cfg map[string]json.RawMessage
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}
for name, config := range cfg {
l.Name = name
l.Config = config
}
return nil
return &cfg, nil
}

View File

@ -0,0 +1,165 @@
// +build go1.12
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterresolver
import (
"testing"
"github.com/google/go-cmp/cmp"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
)
const (
testJSONConfig1 = `{
"discoveryMechanisms": [{
"cluster": "test-cluster-name",
"lrsLoadReportingServerName": "test-lrs-server",
"maxConcurrentRequests": 314,
"type": "EDS",
"edsServiceName": "test-eds-service-name"
}]
}`
testJSONConfig2 = `{
"discoveryMechanisms": [{
"cluster": "test-cluster-name",
"lrsLoadReportingServerName": "test-lrs-server",
"maxConcurrentRequests": 314,
"type": "EDS",
"edsServiceName": "test-eds-service-name"
},{
"type": "LOGICAL_DNS"
}]
}`
testJSONConfig3 = `{
"discoveryMechanisms": [{
"cluster": "test-cluster-name",
"lrsLoadReportingServerName": "test-lrs-server",
"maxConcurrentRequests": 314,
"type": "EDS",
"edsServiceName": "test-eds-service-name"
}],
"localityPickingPolicy":[{"pick_first":{}}],
"endpointPickingPolicy":[{"pick_first":{}}]
}`
testLRSServer = "test-lrs-server"
testMaxRequests = 314
)
func TestParseConfig(t *testing.T) {
tests := []struct {
name string
js string
want *LBConfig
wantErr bool
}{
{
name: "empty json",
js: "",
want: nil,
wantErr: true,
},
{
name: "OK with one discovery mechanism",
js: testJSONConfig1,
want: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{
{
Cluster: testClusterName,
LoadReportingServerName: newString(testLRSServer),
MaxConcurrentRequests: newUint32(testMaxRequests),
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
},
},
LocalityPickingPolicy: nil,
EndpointPickingPolicy: nil,
},
wantErr: false,
},
{
name: "OK with multiple discovery mechanisms",
js: testJSONConfig2,
want: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{
{
Cluster: testClusterName,
LoadReportingServerName: newString(testLRSServer),
MaxConcurrentRequests: newUint32(testMaxRequests),
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
},
{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
},
},
LocalityPickingPolicy: nil,
EndpointPickingPolicy: nil,
},
wantErr: false,
},
{
name: "OK with picking policy override",
js: testJSONConfig3,
want: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{
{
Cluster: testClusterName,
LoadReportingServerName: newString(testLRSServer),
MaxConcurrentRequests: newUint32(testMaxRequests),
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: testEDSServcie,
},
},
LocalityPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: "pick_first",
Config: nil,
},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: "pick_first",
Config: nil,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseConfig([]byte(tt.js))
if (err != nil) != tt.wantErr {
t.Errorf("parseConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if diff := cmp.Diff(got, tt.want); diff != "" {
t.Errorf("parseConfig() got unexpected output, diff (-got +want): %v", diff)
}
})
}
}
func newString(s string) *string {
return &s
}
func newUint32(i uint32) *uint32 {
return &i
}

View File

@ -1,49 +0,0 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterresolver
import (
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/xdsclient"
)
const million = 1000000
func buildPriorityConfigJSON(edsResp xdsclient.EndpointsUpdate, c *EDSConfig) ([]byte, []resolver.Address, error) {
var childConfig *internalserviceconfig.BalancerConfig
if c.ChildPolicy != nil {
childConfig = &internalserviceconfig.BalancerConfig{Name: c.ChildPolicy.Name}
}
return balancerconfig.BuildPriorityConfigJSON(
[]balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Cluster: c.ClusterName,
LoadReportingServerName: c.LrsLoadReportingServerName,
MaxConcurrentRequests: c.MaxConcurrentRequests,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
EDSServiceName: c.EDSServiceName,
},
EDSResp: edsResp,
},
}, childConfig,
)
}

View File

@ -1,123 +0,0 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterresolver
import (
"fmt"
"testing"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/xdsclient"
)
const (
localityCount = 4
addressPerLocality = 2
)
var (
testLocalityIDs []internal.LocalityID
testEndpoints [][]xdsclient.Endpoint
)
func init() {
for i := 0; i < localityCount; i++ {
testLocalityIDs = append(testLocalityIDs, internal.LocalityID{Zone: fmt.Sprintf("test-zone-%d", i)})
var ends []xdsclient.Endpoint
for j := 0; j < addressPerLocality; j++ {
addr := fmt.Sprintf("addr-%d-%d", i, j)
ends = append(ends, xdsclient.Endpoint{
Address: addr,
HealthStatus: xdsclient.EndpointHealthStatusHealthy,
})
}
testEndpoints = append(testEndpoints, ends)
}
}
// TestBuildPriorityConfigJSON is a sanity check that the generated config bytes
// are valid (can be parsed back to a config struct).
//
// The correctness is covered by the unmarshalled version
// TestBuildPriorityConfig.
func TestBuildPriorityConfigJSON(t *testing.T) {
const (
testClusterName = "cluster-name-for-watch"
testEDSServiceName = "service-name-from-parent"
testLRSServer = "lrs-addr-from-config"
testMaxReq = 314
testDropCategory = "test-drops"
testDropOverMillion = 1
)
for _, lrsServer := range []*string{newString(testLRSServer), newString(""), nil} {
got, _, err := buildPriorityConfigJSON(xdsclient.EndpointsUpdate{
Drops: []xdsclient.OverloadDropConfig{{
Category: testDropCategory,
Numerator: testDropOverMillion,
Denominator: million,
}},
Localities: []xdsclient.Locality{{
Endpoints: testEndpoints[3],
ID: testLocalityIDs[3],
Weight: 80,
Priority: 1,
}, {
Endpoints: testEndpoints[1],
ID: testLocalityIDs[1],
Weight: 80,
Priority: 0,
}, {
Endpoints: testEndpoints[2],
ID: testLocalityIDs[2],
Weight: 20,
Priority: 1,
}, {
Endpoints: testEndpoints[0],
ID: testLocalityIDs[0],
Weight: 20,
Priority: 0,
}}},
&EDSConfig{
ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name},
ClusterName: testClusterName,
EDSServiceName: testEDSServiceName,
MaxConcurrentRequests: newUint32(testMaxReq),
LrsLoadReportingServerName: lrsServer,
},
)
if err != nil {
t.Fatalf("buildPriorityConfigJSON(...) failed: %v", err)
}
priorityB := balancer.Get(priority.Name)
if _, err = priorityB.(balancer.ConfigParser).ParseConfig(got); err != nil {
t.Fatalf("ParseConfig(%+v) failed: %v", got, err)
}
}
}
func newString(s string) *string {
return &s
}
func newUint32(i uint32) *uint32 {
return &i
}

View File

@ -31,9 +31,11 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/balancergroup"
"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
"google.golang.org/grpc/xds/internal/testutils"
@ -59,11 +61,11 @@ func init() {
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond * 100
}
func setupTestEDS(t *testing.T, initChild *loadBalancingConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) {
func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) (balancer.Balancer, *testutils.TestClientConn, *fakeclient.Client, func()) {
xdsC := fakeclient.NewClientWithName(testBalancerNameFooBar)
cc := testutils.NewTestClientConn(t)
builder := balancer.Get(Name)
edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}})
edsb := builder.Build(cc, balancer.BuildOptions{Target: resolver.Target{Endpoint: testEDSServcie}})
if edsb == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
@ -71,9 +73,12 @@ func setupTestEDS(t *testing.T, initChild *loadBalancingConfig) (balancer.Balanc
defer cancel()
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{}, xdsC),
BalancerConfig: &EDSConfig{
ClusterName: testClusterName,
ChildPolicy: initChild,
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: initChild,
},
}); err != nil {
edsb.Close()
@ -462,10 +467,17 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
stub.Register(balancerName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
if len(s.ResolverState.Addresses) == 0 {
return nil
m, _ := bd.Data.(map[string]bool)
if m == nil {
m = make(map[string]bool)
bd.Data = m
}
for _, addr := range s.ResolverState.Addresses {
if !m[addr.Addr] {
m[addr.Addr] = true
bd.ClientConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
}
}
bd.ClientConn.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
@ -477,9 +489,24 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
})
t.Logf("initialize with sub-balancer: stub-balancer")
edsb, cc, xdsC, cleanup := setupTestEDS(t, &loadBalancingConfig{Name: balancerName})
edsb, cc, xdsC, cleanup := setupTestEDS(t, &internalserviceconfig.BalancerConfig{Name: balancerName})
defer cleanup()
t.Logf("update sub-balancer to stub-balancer")
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: balancerName,
},
},
}); err != nil {
t.Fatal(err)
}
// Two localities, each with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
@ -497,10 +524,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
t.Logf("update sub-balancer to round-robin")
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}},
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
<-cc.RemoveSubConnCh
}
@ -518,10 +554,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
t.Logf("update sub-balancer to stub-balancer")
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: balancerName}},
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: balancerName,
},
},
}); err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
scToRemove := <-cc.RemoveSubConnCh
if !cmp.Equal(scToRemove, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) &&
@ -542,10 +587,19 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
t.Logf("update sub-balancer to round-robin")
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{ClusterName: testClusterName, ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name}},
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
<-cc.RemoveSubConnCh
}
@ -568,14 +622,20 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
var maxRequests uint32 = 50
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name},
ClusterName: testClusterName,
MaxConcurrentRequests: &maxRequests,
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
MaxConcurrentRequests: &maxRequests,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatal(err)
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
@ -628,14 +688,20 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) {
// update afterwards). Make sure the new picker uses the new configs.
var maxRequests2 uint32 = 10
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &EDSConfig{
ChildPolicy: &loadBalancingConfig{Name: roundrobin.Name},
ClusterName: testClusterName,
MaxConcurrentRequests: &maxRequests2,
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{
Cluster: testClusterName,
MaxConcurrentRequests: &maxRequests2,
Type: balancerconfig.DiscoveryMechanismTypeEDS,
}},
EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
}); err != nil {
t.Fatal(err)
}
// Picks with drops.
dones = []func(){}
p2 := <-cc.NewPickerCh

View File

@ -1,87 +0,0 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package clusterresolver
import (
"google.golang.org/grpc/xds/internal/xdsclient"
)
// watchUpdate wraps the information received from a registered EDS watcher. A
// non-nil error is propagated to the underlying child balancer. A valid update
// results in creating a new child balancer (priority balancer, if one doesn't
// already exist) and pushing the updated balancer config to it.
type watchUpdate struct {
eds xdsclient.EndpointsUpdate
err error
}
// edsWatcher takes an EDS balancer config, and use the xds_client to watch EDS
// updates. The EDS updates are passed back to the balancer via a channel.
type edsWatcher struct {
parent *clusterResolverBalancer
updateChannel chan *watchUpdate
edsToWatch string
edsCancel func()
}
func (ew *edsWatcher) updateConfig(config *EDSConfig) {
// If EDSServiceName is set, use it to watch EDS. Otherwise, use the cluster
// name.
newEDSToWatch := config.EDSServiceName
if newEDSToWatch == "" {
newEDSToWatch = config.ClusterName
}
if ew.edsToWatch == newEDSToWatch {
return
}
// Restart EDS watch when the eds name to watch has changed.
ew.edsToWatch = newEDSToWatch
if ew.edsCancel != nil {
ew.edsCancel()
}
cancelEDSWatch := ew.parent.xdsClient.WatchEndpoints(newEDSToWatch, func(update xdsclient.EndpointsUpdate, err error) {
select {
case <-ew.updateChannel:
default:
}
ew.updateChannel <- &watchUpdate{eds: update, err: err}
})
ew.parent.logger.Infof("Watch started on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient)
ew.edsCancel = func() {
cancelEDSWatch()
ew.parent.logger.Infof("Watch cancelled on resource name %v with xds-client %p", newEDSToWatch, ew.parent.xdsClient)
}
}
// stopWatch stops the EDS watch.
//
// Call to updateConfig will restart the watch with the new name.
func (ew *edsWatcher) stopWatch() {
if ew.edsCancel != nil {
ew.edsCancel()
ew.edsCancel = nil
}
ew.edsToWatch = ""
}

View File

@ -28,6 +28,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig"
"google.golang.org/grpc/xds/internal/balancer/priority"
"google.golang.org/grpc/xds/internal/testutils"
)
@ -712,3 +714,107 @@ func (s) TestEDSPriority_FirstPriorityRemoved(t *testing.T) {
t.Fatal(err)
}
}
// Watch resources from EDS and DNS, with EDS as the higher priority. Lower
// priority is used when higher priority is not ready.
func (s) TestFallbackToDNS(t *testing.T) {
const testDNSEndpointAddr = "3.1.4.1:5"
// dnsTargetCh, dnsCloseCh, resolveNowCh, dnsR, cleanup := setupDNS()
dnsTargetCh, _, resolveNowCh, dnsR, cleanupDNS := setupDNS()
defer cleanupDNS()
edsb, cc, xdsC, cleanup := setupTestEDS(t, nil)
defer cleanup()
if err := edsb.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{
{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
},
{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: testDNSTarget,
},
},
},
}); err != nil {
t.Fatal(err)
}
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
select {
case target := <-dnsTargetCh:
if diff := cmp.Diff(target, resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}); diff != "" {
t.Fatalf("got unexpected DNS target to watch, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for building DNS resolver")
}
// One locality with one backend.
clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab1.Build()), nil)
// Also send a DNS update, because the balancer needs both updates from all
// resources to move on.
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: testDNSEndpointAddr}}})
addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.NewSubConnCh
// p0 is ready.
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test roundrobin with only p0 subconns.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc0}); err != nil {
t.Fatal(err)
}
// Turn down 0, p1 (DNS) will be used.
edsb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// The transient failure above should not trigger a re-resolve to the DNS
// resolver. Need to read to clear the channel, to avoid potential deadlock
// writing to the channel later.
shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer shortCancel()
select {
case <-resolveNowCh:
t.Fatal("unexpected re-resolve trigger by transient failure from EDS endpoint")
case <-shortCtx.Done():
}
// The addresses used to create new SubConn should be the DNS endpoint.
addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testDNSEndpointAddr; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with 1.
if err := testRoundRobinPickerFromCh(cc.NewPickerCh, []balancer.SubConn{sc1}); err != nil {
t.Fatal(err)
}
// Turn down the DNS endpoint, this should trigger an re-resolve in the DNS
// resolver.
edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// The transient failure above should trigger a re-resolve to the DNS
// resolver. Need to read to clear the channel, to avoid potential deadlock
// writing to the channel later.
select {
case <-resolveNowCh:
case <-ctx.Done():
t.Fatal("Timed out waiting for re-resolve")
}
}

View File

@ -28,8 +28,8 @@ import (
// resourceUpdate is a combined update from all the resources, in the order of
// priority. For example, it can be {EDS, EDS, DNS}.
type resourceUpdate struct {
p []balancerconfig.PriorityConfig
err error
priorities []balancerconfig.PriorityConfig
err error
}
type discoveryMechanism interface {
@ -197,7 +197,7 @@ func (rr *resourceResolver) generate() {
case <-rr.updateChannel:
default:
}
rr.updateChannel <- &resourceUpdate{p: ret}
rr.updateChannel <- &resourceUpdate{priorities: ret}
}
type edsDiscoveryMechanism struct {

View File

@ -62,14 +62,14 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
}{
{name: "watch EDS",
clusterName: testClusterName,
edsName: testServiceName,
wantName: testServiceName,
edsName: testEDSServcie,
wantName: testEDSServcie,
edsUpdate: testEDSUpdates[0],
want: []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
EDSServiceName: testServiceName,
EDSServiceName: testEDSServcie,
},
EDSResp: testEDSUpdates[0],
}},
@ -110,7 +110,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, test.want); diff != "" {
if diff := cmp.Diff(u.priorities, test.want); diff != "" {
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
@ -123,7 +123,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if edsNameCanceled != test.wantName {
t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testServiceName)
t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled, testEDSServcie)
}
})
}
@ -192,7 +192,7 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: test.addrs})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, test.want); diff != "" {
if diff := cmp.Diff(u.priorities, test.want); diff != "" {
t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
@ -224,7 +224,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
EDSServiceName: testServiceName,
EDSServiceName: testEDSServcie,
}})
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
@ -232,19 +232,19 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
if err != nil {
t.Fatalf("xdsClient.WatchCDS failed with error: %v", err)
}
if gotEDSName1 != testServiceName {
t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testServiceName)
if gotEDSName1 != testEDSServcie {
t.Fatalf("xdsClient.WatchEDS called for cluster: %v, want: %v", gotEDSName1, testEDSServcie)
}
// Invoke callback, should get an update.
fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
EDSServiceName: testServiceName,
EDSServiceName: testEDSServcie,
},
EDSResp: testEDSUpdates[0],
}}); diff != "" {
@ -264,7 +264,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
t.Fatalf("xdsClient.CancelCDS failed with error: %v", err)
}
if edsNameCanceled1 != gotEDSName1 {
t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testServiceName)
t.Fatalf("xdsClient.CancelEDS called for %v, want: %v", edsNameCanceled1, testEDSServcie)
}
gotEDSName2, err := fakeClient.WaitForWatchEDS(ctx)
if err != nil {
@ -287,7 +287,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
@ -314,7 +314,7 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) {
}
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
@ -385,7 +385,7 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
@ -497,7 +497,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
@ -538,7 +538,7 @@ func (s) TestResourceResolverChangePriority(t *testing.T) {
}
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
@ -625,7 +625,7 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{
{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
@ -687,7 +687,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil)
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeEDS,
Cluster: testClusterName,
@ -724,7 +724,7 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: testDNSTarget,
@ -845,7 +845,7 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) {
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}})
select {
case u := <-rr.updateChannel:
if diff := cmp.Diff(u.p, []balancerconfig.PriorityConfig{{
if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{
Mechanism: balancerconfig.DiscoveryMechanism{
Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS,
DNSHostname: testDNSTarget,

View File

@ -29,7 +29,7 @@ import (
// Child is a child of priority balancer.
type Child struct {
Config *internalserviceconfig.BalancerConfig `json:"config,omitempty"`
IgnoreReresolutionRequests bool
IgnoreReresolutionRequests bool `json:"ignoreReresolutionRequests,omitempty"`
}
// LBConfig represents priority balancer's config.