diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index 6ec4263f03..832e365b99 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -280,11 +280,13 @@ public final class CdsLoadBalancer extends LoadBalancer { newUpdate.getLbPolicy().equals("round_robin"), "The load balancing policy in ClusterUpdate '%s' is not supported", newUpdate); - final XdsConfig edsConfig = new XdsConfig( - new LbConfig(newUpdate.getLbPolicy(), ImmutableMap.of()), - /* fallbackPolicy = */ null, - /* edsServiceName = */ newUpdate.getEdsServiceName(), - /* lrsServerName = */ newUpdate.getLrsServerName()); + final XdsConfig edsConfig = + new XdsConfig( + /* cluster = */ newUpdate.getClusterName(), + new LbConfig(newUpdate.getLbPolicy(), ImmutableMap.of()), + /* fallbackPolicy = */ null, + /* edsServiceName = */ newUpdate.getEdsServiceName(), + /* lrsServerName = */ newUpdate.getLrsServerName()); updateSslContextProvider(newUpdate.getUpstreamTlsContext()); if (edsBalancer == null) { edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(helper); diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java index 1b439dcd2b..5331ed14cf 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java @@ -72,9 +72,6 @@ final class EdsLoadBalancer extends LoadBalancer { private XdsClient xdsClient; @Nullable private String clusterName; - // FIXME(chengyuanzhang): should be one instance per cluster:cluster_service. - @Nullable - private LoadStatsStore loadStatsStore; EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) { this( @@ -159,6 +156,7 @@ final class EdsLoadBalancer extends LoadBalancer { XdsClient createXdsClient() { return new XdsClientImpl( + edsLbHelper.getAuthority(), serverList, channelFactory, node, @@ -173,41 +171,15 @@ final class EdsLoadBalancer extends LoadBalancer { xdsClient = xdsClientPool.getObject(); } - // The edsServiceName field is null in legacy gRPC client with EDS: use target authority for - // querying endpoints, but in the future we expect this to be explicitly given by EDS config. - // We assume if edsServiceName is null, it will always be null in later resolver updates; - // and if edsServiceName is not null, it will always be not null. - String clusterServiceName = newXdsConfig.edsServiceName; - if (clusterServiceName == null) { - clusterServiceName = edsLbHelper.getAuthority(); - } - if (clusterName == null) { - // TODO(zdapeng): Use the correct cluster name. Currently load reporting will be broken if - // edsServiceName is changed because we are using edsServiceName for the cluster name. - clusterName = clusterServiceName; - loadStatsStore = new LoadStatsStoreImpl(clusterName, null); - } - - // FIXME(chengyuanzhang): should report loads for each cluster:cluster_service. - if (xdsConfig == null - || !Objects.equals(newXdsConfig.lrsServerName, xdsConfig.lrsServerName)) { - if (newXdsConfig.lrsServerName != null) { - if (!newXdsConfig.lrsServerName.equals("")) { - throw new AssertionError( - "Can only report load to the same management server"); - } - xdsClient.reportClientStats(clusterName, null, loadStatsStore); - } else if (xdsConfig != null) { - xdsClient.cancelClientStatsReport(clusterName, null); - } - } + // FIXME(chengyuanzhang): make cluster name required in XdsConfig. + clusterName = newXdsConfig.cluster != null ? newXdsConfig.cluster : edsLbHelper.getAuthority(); // Note: childPolicy change will be handled in LocalityStore, to be implemented. // If edsServiceName in XdsConfig is changed, do a graceful switch. if (xdsConfig == null || !Objects.equals(newXdsConfig.edsServiceName, xdsConfig.edsServiceName)) { LoadBalancer.Factory clusterEndpointsLoadBalancerFactory = - new ClusterEndpointsBalancerFactory(clusterServiceName); + new ClusterEndpointsBalancerFactory(newXdsConfig.edsServiceName); switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancerFactory); } switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses); @@ -232,9 +204,6 @@ final class EdsLoadBalancer extends LoadBalancer { channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down"); switchingLoadBalancer.shutdown(); if (xdsClient != null) { - if (xdsConfig != null && xdsConfig.lrsServerName != null) { - xdsClient.cancelClientStatsReport(clusterName, null); - } xdsClient = xdsClientPool.returnObject(xdsClient); } } @@ -243,10 +212,12 @@ final class EdsLoadBalancer extends LoadBalancer { * A load balancer factory that provides a load balancer for a given cluster service. */ private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory { - final String clusterServiceName; + @Nullable final String clusterServiceName; + final LoadStatsStore loadStatsStore; - ClusterEndpointsBalancerFactory(String clusterServiceName) { + ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) { this.clusterServiceName = clusterServiceName; + loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName); } @Override @@ -260,7 +231,7 @@ final class EdsLoadBalancer extends LoadBalancer { return false; } ClusterEndpointsBalancerFactory that = (ClusterEndpointsBalancerFactory) o; - return clusterServiceName.equals(that.clusterServiceName); + return Objects.equals(clusterServiceName, that.clusterServiceName); } @Override @@ -272,20 +243,41 @@ final class EdsLoadBalancer extends LoadBalancer { * Load-balances endpoints for a given cluster. */ final class ClusterEndpointsBalancer extends LoadBalancer { + // Name of the resource to be used for querying endpoint information. + final String resourceName; final Helper helper; final EndpointWatcherImpl endpointWatcher; final LocalityStore localityStore; + boolean isReportingLoad; ClusterEndpointsBalancer(Helper helper) { this.helper = helper; - + resourceName = clusterServiceName != null ? clusterServiceName : clusterName; localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore); - endpointWatcher = new EndpointWatcherImpl(localityStore); - xdsClient.watchEndpointData(clusterServiceName, endpointWatcher); + xdsClient.watchEndpointData(resourceName, endpointWatcher); } - // TODO(zddapeng): In handleResolvedAddresses() handle child policy change if any. + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + XdsConfig config = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); + if (config.lrsServerName != null) { + if (!config.lrsServerName.equals("")) { + throw new AssertionError( + "Can only report load to the same management server"); + } + if (!isReportingLoad) { + xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore); + isReportingLoad = true; + } + } else { + if (isReportingLoad) { + xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); + isReportingLoad = false; + } + } + // TODO(zddapeng): In handleResolvedAddresses() handle child policy change if any. + } @Override public void handleNameResolutionError(Status error) { @@ -303,8 +295,12 @@ final class EdsLoadBalancer extends LoadBalancer { @Override public void shutdown() { + if (isReportingLoad) { + xdsClient.cancelClientStatsReport(clusterName, clusterServiceName); + isReportingLoad = false; + } localityStore.reset(); - xdsClient.cancelEndpointDataWatch(clusterServiceName, endpointWatcher); + xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher); } } } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index aa0c3584f7..8fb069c485 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; @@ -35,10 +37,8 @@ import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -56,6 +56,9 @@ import javax.annotation.concurrent.NotThreadSafe; final class LoadReportClient { private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName()); + @VisibleForTesting + static final String TARGET_NAME_METADATA_KEY = "PROXYLESS_CLIENT_HOSTNAME"; + private final ManagedChannel channel; private final Node node; private final SynchronizationContext syncContext; @@ -64,10 +67,8 @@ final class LoadReportClient { private final Stopwatch retryStopwatch; private final BackoffPolicy.Provider backoffPolicyProvider; - // Sources of load stats data for each cluster. - // FIXME(chengyuanzhang): this should be Map> as each - // ClusterStats is keyed by cluster:cluster_service. Currently, cluster_service is always unset. - private final Map loadStatsStoreMap = new HashMap<>(); + // Sources of load stats data for each cluster:cluster_service. + private final Map> loadStatsStoreMap = new HashMap<>(); private boolean started; @Nullable @@ -80,6 +81,7 @@ final class LoadReportClient { private LoadReportCallback callback; LoadReportClient( + String targetName, ManagedChannel channel, Node node, SynchronizationContext syncContext, @@ -87,13 +89,21 @@ final class LoadReportClient { BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { this.channel = checkNotNull(channel, "channel"); - this.node = checkNotNull(node, "node"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timerService = checkNotNull(scheduledExecutorService, "timeService"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.retryStopwatch = stopwatchSupplier.get(); - started = false; + checkNotNull(targetName, "targetName"); + checkNotNull(node, "node"); + Struct metadata = + node.getMetadata() + .toBuilder() + .putFields( + TARGET_NAME_METADATA_KEY, + Value.newBuilder().setStringValue(targetName).build()) + .build(); + this.node = node.toBuilder().setMetadata(metadata).build(); } /** @@ -101,7 +111,7 @@ final class LoadReportClient { * stats periodically. Calling this method on an already started {@link LoadReportClient} is * no-op. */ - public void startLoadReporting(LoadReportCallback callback) { + void startLoadReporting(LoadReportCallback callback) { if (started) { return; } @@ -114,7 +124,7 @@ final class LoadReportClient { * Terminates load reporting. Calling this method on an already stopped * {@link LoadReportClient} is no-op. */ - public void stopLoadReporting() { + void stopLoadReporting() { if (!started) { return; } @@ -132,37 +142,35 @@ final class LoadReportClient { * Provides this LoadReportClient source of load stats data for the given * cluster:cluster_service. If requested, data from the given loadStatsStore is * periodically queried and sent to traffic director by this LoadReportClient. - * - *

Currently we expect load stats data for all clusters to report loads for are provided - * before load reporting starts (so that LRS initial request tells management server clusters - * it is reporting loads for). Design TBD for reporting loads for extra clusters after load - * reporting has started. - * - *

Note: currently clusterServiceName is always unset. */ - public void addLoadStatsStore( + void addLoadStatsStore( String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { checkState( - !loadStatsStoreMap.containsKey(clusterName), - "load stats for cluster " + clusterName + " already exists"); - // FIXME(chengyuanzhang): relax this restriction after design is fleshed out. - checkState( - !started, - "load stats for all clusters to report loads for should be provided before " - + "load reporting has started"); - loadStatsStoreMap.put(clusterName, loadStatsStore); + !loadStatsStoreMap.containsKey(clusterName) + || !loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName), + "load stats for cluster: %s, cluster service: %s already exists", + clusterName, clusterServiceName); + if (!loadStatsStoreMap.containsKey(clusterName)) { + loadStatsStoreMap.put(clusterName, new HashMap()); + } + Map clusterLoadStatsStores = loadStatsStoreMap.get(clusterName); + clusterLoadStatsStores.put(clusterServiceName, loadStatsStore); } /** * Stops providing load stats data for the given cluster:cluster_service. - * - *

Note: currently clusterServiceName is always unset. */ - public void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) { + void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) { checkState( - loadStatsStoreMap.containsKey(clusterName), - "load stats for cluster " + clusterName + " does not exist"); - loadStatsStoreMap.remove(clusterName); + loadStatsStoreMap.containsKey(clusterName) + && loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName), + "load stats for cluster: %s, cluster service: %s does not exist", + clusterName, clusterServiceName); + Map clusterLoadStatsStores = loadStatsStoreMap.get(clusterName); + clusterLoadStatsStores.remove(clusterServiceName); + if (clusterLoadStatsStores.isEmpty()) { + loadStatsStoreMap.remove(clusterName); + } } @VisibleForTesting @@ -217,15 +225,12 @@ final class LoadReportClient { void start() { lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this); reportStopwatch.reset().start(); - // Tells management server which clusters the client is reporting loads for. - List clusterStatsList = new ArrayList<>(); - for (String clusterName : loadStatsStoreMap.keySet()) { - clusterStatsList.add(ClusterStats.newBuilder().setClusterName(clusterName).build()); - } + + // Send an initial LRS request with empty cluster stats. Management server is able to + // infer clusters the gRPC client sending loads to. LoadStatsRequest initRequest = LoadStatsRequest.newBuilder() .setNode(node) - .addAllClusterStats(clusterStatsList) .build(); lrsRequestWriter.onNext(initRequest); logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest); @@ -269,13 +274,15 @@ final class LoadReportClient { LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node); for (String name : clusterNames) { if (loadStatsStoreMap.containsKey(name)) { - LoadStatsStore loadStatsStore = loadStatsStoreMap.get(name); - ClusterStats report = - loadStatsStore.generateLoadReport() - .toBuilder() - .setLoadReportInterval(Durations.fromNanos(interval)) - .build(); - requestBuilder.addClusterStats(report); + Map clusterLoadStatsStores = loadStatsStoreMap.get(name); + for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) { + ClusterStats report = + statsStore.generateLoadReport() + .toBuilder() + .setLoadReportInterval(Durations.fromNanos(interval)) + .build(); + requestBuilder.addClusterStats(report); + } } } LoadStatsRequest request = requestBuilder.build(); @@ -308,10 +315,16 @@ final class LoadReportClient { } else { logger.log(Level.FINE, "Received an LRS response: {0}", response); } - loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval()); - callback.onReportResponse(loadReportIntervalNano); - clusterNames.clear(); - clusterNames.addAll(response.getClustersList()); + long interval = Durations.toNanos(response.getLoadReportingInterval()); + if (interval != loadReportIntervalNano) { + loadReportIntervalNano = interval; + callback.onReportResponse(loadReportIntervalNano); + } + if (clusterNames.size() != response.getClustersCount() + || !clusterNames.containsAll(response.getClustersList())) { + clusterNames.clear(); + clusterNames.addAll(response.getClustersList()); + } scheduleNextLoadReport(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 3fc0fc0e36..d434ec9db0 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -112,19 +112,22 @@ abstract class XdsClient { */ static final class ClusterUpdate { private final String clusterName; + @Nullable private final String edsServiceName; private final String lbPolicy; - private final boolean enableLrs; + @Nullable private final String lrsServerName; private final UpstreamTlsContext upstreamTlsContext; - private ClusterUpdate(String clusterName, String edsServiceName, String lbPolicy, - boolean enableLrs, @Nullable String lrsServerName, + private ClusterUpdate( + String clusterName, + @Nullable String edsServiceName, + String lbPolicy, + @Nullable String lrsServerName, @Nullable UpstreamTlsContext upstreamTlsContext) { this.clusterName = clusterName; this.edsServiceName = edsServiceName; this.lbPolicy = lbPolicy; - this.enableLrs = enableLrs; this.lrsServerName = lrsServerName; this.upstreamTlsContext = upstreamTlsContext; } @@ -136,6 +139,7 @@ abstract class XdsClient { /** * Returns the resource name for EDS requests. */ + @Nullable String getEdsServiceName() { return edsServiceName; } @@ -148,16 +152,9 @@ abstract class XdsClient { return lbPolicy; } - /** - * Returns true if LRS is enabled. - */ - boolean isEnableLrs() { - return enableLrs; - } - /** * Returns the server name to send client load reports to if LRS is enabled. {@code null} if - * {@link #isEnableLrs()} returns {@code false}. + * load reporting is disabled for this cluster. */ @Nullable String getLrsServerName() { @@ -176,9 +173,9 @@ abstract class XdsClient { static final class Builder { private String clusterName; + @Nullable private String edsServiceName; private String lbPolicy; - private boolean enableLrs; @Nullable private String lrsServerName; @Nullable @@ -203,11 +200,6 @@ abstract class XdsClient { return this; } - Builder setEnableLrs(boolean enableLrs) { - this.enableLrs = enableLrs; - return this; - } - Builder setLrsServerName(String lrsServerName) { this.lrsServerName = lrsServerName; return this; @@ -221,13 +213,10 @@ abstract class XdsClient { ClusterUpdate build() { Preconditions.checkState(clusterName != null, "clusterName is not set"); Preconditions.checkState(lbPolicy != null, "lbPolicy is not set"); - Preconditions.checkState( - (enableLrs && lrsServerName != null) || (!enableLrs && lrsServerName == null), - "lrsServerName is not set while LRS is enabled " - + "OR lrsServerName is set while LRS is not enabled"); + return - new ClusterUpdate(clusterName, edsServiceName == null ? clusterName : edsServiceName, - lbPolicy, enableLrs, lrsServerName, upstreamTlsContext); + new ClusterUpdate( + clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 257404e395..dfd787d292 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -88,6 +88,8 @@ final class XdsClientImpl extends XdsClient { private final MessagePrinter respPrinter = new MessagePrinter(); + // Name of the target server this gRPC client is trying to talk to. + private final String targetName; private final ManagedChannel channel; private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; @@ -162,6 +164,7 @@ final class XdsClientImpl extends XdsClient { private String ldsResourceName; XdsClientImpl( + String targetName, List servers, // list of management servers XdsChannelFactory channelFactory, Node node, @@ -169,6 +172,7 @@ final class XdsClientImpl extends XdsClient { ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { + this.targetName = checkNotNull(targetName, "targetName"); this.channel = checkNotNull(channelFactory, "channelFactory") .createChannel(checkNotNull(servers, "servers")); @@ -407,29 +411,30 @@ final class XdsClientImpl extends XdsClient { @Override void reportClientStats( String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { - checkState(lrsClient == null, - "load reporting has already started, cannot change clusters to report loads for"); - lrsClient = - new LoadReportClient( - channel, - node, - syncContext, - timeService, - backoffPolicyProvider, - stopwatchSupplier); + if (lrsClient == null) { + lrsClient = + new LoadReportClient( + targetName, + channel, + node, + syncContext, + timeService, + backoffPolicyProvider, + stopwatchSupplier); + lrsClient.startLoadReporting(new LoadReportCallback() { + @Override + public void onReportResponse(long reportIntervalNano) {} + }); + } lrsClient.addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore); - lrsClient.startLoadReporting(new LoadReportCallback() { - @Override - public void onReportResponse(long reportIntervalNano) {} - }); } @Override void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) { checkState(lrsClient != null, "load reporting was never started"); lrsClient.removeLoadStatsStore(clusterName, clusterServiceName); - lrsClient.stopLoadReporting(); - lrsClient = null; + // TODO(chengyuanzhang): can be optimized to stop load reporting if no more loads need + // to be reported. } /** @@ -740,8 +745,7 @@ final class XdsClientImpl extends XdsClient { + "indicate to use EDS over ADS."; break; } - // If the service_name field is set, that value will be used for the EDS request - // instead of the cluster name (default). + // If the service_name field is set, that value will be used for the EDS request. if (!edsClusterConfig.getServiceName().isEmpty()) { updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName()); edsServices.add(edsClusterConfig.getServiceName()); @@ -764,10 +768,7 @@ final class XdsClientImpl extends XdsClient { + "management server."; break; } - updateBuilder.setEnableLrs(true); updateBuilder.setLrsServerName(""); - } else { - updateBuilder.setEnableLrs(false); } if (cluster.hasTlsContext()) { updateBuilder.setUpstreamTlsContext(cluster.getTlsContext()); diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java index 7b3faf67dd..08de69c664 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancerProvider.java @@ -77,13 +77,14 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { static ConfigOrError parseLoadBalancingConfigPolicy( Map rawLoadBalancingPolicyConfig, LoadBalancerRegistry registry) { try { + String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster"); LbConfig childPolicy = selectChildPolicy(rawLoadBalancingPolicyConfig, registry); LbConfig fallbackPolicy = selectFallbackPolicy(rawLoadBalancingPolicyConfig, registry); String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName"); String lrsServerName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName"); return ConfigOrError.fromConfig( - new XdsConfig(childPolicy, fallbackPolicy, edsServiceName, lrsServerName)); + new XdsConfig(cluster, childPolicy, fallbackPolicy, edsServiceName, lrsServerName)); } catch (RuntimeException e) { return ConfigOrError.fromError( Status.fromThrowable(e).withDescription( @@ -128,6 +129,9 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { * Represents a successfully parsed and validated LoadBalancingConfig for XDS. */ static final class XdsConfig { + // FIXME(chengyuanzhang): make cluster name required. + @Nullable + final String cluster; // TODO(carl-mastrangelo): make these Object's containing the fully parsed child configs. @Nullable final LbConfig childPolicy; @@ -144,10 +148,12 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { final String lrsServerName; XdsConfig( + @Nullable String cluster, @Nullable LbConfig childPolicy, @Nullable LbConfig fallbackPolicy, @Nullable String edsServiceName, @Nullable String lrsServerName) { + this.cluster = cluster; this.childPolicy = childPolicy; this.fallbackPolicy = fallbackPolicy; this.edsServiceName = edsServiceName; @@ -157,6 +163,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("cluster", cluster) .add("childPolicy", childPolicy) .add("fallbackPolicy", fallbackPolicy) .add("edsServiceName", edsServiceName) @@ -170,7 +177,8 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { return false; } XdsConfig that = (XdsConfig) obj; - return Objects.equal(this.childPolicy, that.childPolicy) + return Objects.equal(this.cluster, that.cluster) + && Objects.equal(this.childPolicy, that.childPolicy) && Objects.equal(this.fallbackPolicy, that.fallbackPolicy) && Objects.equal(this.edsServiceName, that.edsServiceName) && Objects.equal(this.lrsServerName, that.lrsServerName); @@ -178,7 +186,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider { @Override public int hashCode() { - return Objects.hashCode(childPolicy, fallbackPolicy, edsServiceName, lrsServerName); + return Objects.hashCode(cluster, childPolicy, fallbackPolicy, edsServiceName, lrsServerName); } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index cb1651363d..ea142a707f 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -123,6 +123,7 @@ final class XdsNameResolver extends NameResolver { XdsClient createXdsClient() { return new XdsClientImpl( + authority, serverList, channelFactory, node, diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index 912ad72d94..75b6db1ea0 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -198,7 +198,6 @@ public class CdsLoadBalancerTest { .setClusterName("foo.googleapis.com") .setEdsServiceName("edsServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .build()); verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); @@ -230,7 +229,6 @@ public class CdsLoadBalancerTest { .setClusterName("foo.googleapis.com") .setEdsServiceName("edsServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .build()); assertThat(edsLbHelpers).hasSize(1); @@ -240,6 +238,7 @@ public class CdsLoadBalancerTest { ArgumentCaptor resolvedAddressesCaptor1 = ArgumentCaptor.forClass(null); verify(edsLoadBalancer1).handleResolvedAddresses(resolvedAddressesCaptor1.capture()); XdsConfig expectedXdsConfig = new XdsConfig( + "foo.googleapis.com", new LbConfig("round_robin", ImmutableMap.of()), null, "edsServiceFoo.googleapis.com", @@ -271,7 +270,6 @@ public class CdsLoadBalancerTest { .setClusterName("bar.googleapis.com") .setEdsServiceName("edsServiceBar.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(true) .setLrsServerName("lrsBar.googleapis.com") .build()); @@ -282,6 +280,7 @@ public class CdsLoadBalancerTest { ArgumentCaptor resolvedAddressesCaptor2 = ArgumentCaptor.forClass(null); verify(edsLoadBalancer2).handleResolvedAddresses(resolvedAddressesCaptor2.capture()); expectedXdsConfig = new XdsConfig( + "bar.googleapis.com", new LbConfig("round_robin", ImmutableMap.of()), null, "edsServiceBar.googleapis.com", @@ -307,10 +306,10 @@ public class CdsLoadBalancerTest { .setClusterName("bar.googleapis.com") .setEdsServiceName("edsServiceBar2.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .build()); verify(edsLoadBalancer2, times(2)).handleResolvedAddresses(resolvedAddressesCaptor2.capture()); expectedXdsConfig = new XdsConfig( + "bar.googleapis.com", new LbConfig("round_robin", ImmutableMap.of()), null, "edsServiceBar2.googleapis.com", @@ -358,7 +357,6 @@ public class CdsLoadBalancerTest { .setClusterName("foo.googleapis.com") .setEdsServiceName("edsServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .setUpstreamTlsContext(upstreamTlsContext) .build()); @@ -391,7 +389,6 @@ public class CdsLoadBalancerTest { .setClusterName("bar.googleapis.com") .setEdsServiceName("eds1ServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .setUpstreamTlsContext(upstreamTlsContext) .build()); @@ -416,7 +413,6 @@ public class CdsLoadBalancerTest { .setClusterName("bar.googleapis.com") .setEdsServiceName("eds1ServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .setUpstreamTlsContext(upstreamTlsContext1) .build()); @@ -436,7 +432,6 @@ public class CdsLoadBalancerTest { .setClusterName("bar.googleapis.com") .setEdsServiceName("eds1ServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .setUpstreamTlsContext(null) .build()); verify(mockTlsContextManager).releaseClientSslContextProvider(same(mockSslContextProvider1)); @@ -501,7 +496,6 @@ public class CdsLoadBalancerTest { .setClusterName("foo.googleapis.com") .setEdsServiceName("edsServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .build()); assertThat(edsLbHelpers).hasSize(1); @@ -542,7 +536,6 @@ public class CdsLoadBalancerTest { .setClusterName("foo.googleapis.com") .setEdsServiceName("edsServiceFoo.googleapis.com") .setLbPolicy("round_robin") - .setEnableLrs(false) .build()); ArgumentCaptor endpointWatcherCaptor = ArgumentCaptor.forClass(null); diff --git a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java index e69490906e..adcae1c52d 100644 --- a/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/EdsLoadBalancerTest.java @@ -108,6 +108,7 @@ import org.mockito.junit.MockitoRule; @RunWith(Parameterized.class) public class EdsLoadBalancerTest { + private static final String CLUSTER_NAME = "eds-lb-test.example.com"; private static final String SERVICE_AUTHORITY = "test.authority.example.com"; @Rule @@ -234,8 +235,13 @@ public class EdsLoadBalancerTest { if (isFullFlow) { xdsClientPoolFromResolveAddresses = new FakeXdsClientPool( new XdsClientImpl( - serverList, channelFactory, Node.getDefaultInstance(), syncContext, - fakeClock.getScheduledExecutorService(), mock(BackoffPolicy.Provider.class), + SERVICE_AUTHORITY, + serverList, + channelFactory, + Node.getDefaultInstance(), + syncContext, + fakeClock.getScheduledExecutorService(), + mock(BackoffPolicy.Provider.class), fakeClock.getStopwatchSupplier())); } @@ -265,7 +271,7 @@ public class EdsLoadBalancerTest { @Test public void handleNameResolutionErrorBeforeAndAfterEdsWorkding() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); // handleResolutionError() before receiving any endpoint update. edsLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status")); @@ -273,7 +279,7 @@ public class EdsLoadBalancerTest { // Endpoint update received. ClusterLoadAssignment clusterLoadAssignment = - buildClusterLoadAssignment("edsServiceName1", + buildClusterLoadAssignment(CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( @@ -293,7 +299,7 @@ public class EdsLoadBalancerTest { public void handleEdsServiceNameChangeInXdsConfig() { assertThat(childHelpers).isEmpty(); - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName1", null)); ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment("edsServiceName1", ImmutableList.of( @@ -313,7 +319,7 @@ public class EdsLoadBalancerTest { assertLatestConnectivityState(CONNECTING); // Change edsServicename to edsServiceName2. - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName2", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName2", null)); // The old balancer was not READY, so it will be shutdown immediately. verify(childBalancer1).shutdown(); @@ -343,7 +349,7 @@ public class EdsLoadBalancerTest { assertLatestSubchannelPicker(subchannel2); // Change edsServiceName to edsServiceName3. - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName3", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName3", null)); clusterLoadAssignment = buildClusterLoadAssignment("edsServiceName3", ImmutableList.of( @@ -369,7 +375,7 @@ public class EdsLoadBalancerTest { assertLatestConnectivityState(CONNECTING); // Change edsServiceName to edsServiceName4. - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName4", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName4", null)); verify(childBalancer3).shutdown(); clusterLoadAssignment = @@ -397,7 +403,7 @@ public class EdsLoadBalancerTest { assertLatestSubchannelPicker(subchannel4); // Change edsServiceName to edsServiceName5. - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName5", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName5", null)); clusterLoadAssignment = buildClusterLoadAssignment("edsServiceName5", ImmutableList.of( @@ -432,13 +438,13 @@ public class EdsLoadBalancerTest { @Test public void firstAndSecondEdsResponseReceived_onWorkingCalledOnce() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); verify(resourceUpdateCallback, never()).onWorking(); // first EDS response ClusterLoadAssignment clusterLoadAssignment = - buildClusterLoadAssignment("edsServiceName1", + buildClusterLoadAssignment(CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( @@ -451,7 +457,7 @@ public class EdsLoadBalancerTest { // second EDS response clusterLoadAssignment = - buildClusterLoadAssignment("edsServiceName1", + buildClusterLoadAssignment(CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( @@ -466,10 +472,10 @@ public class EdsLoadBalancerTest { @Test public void handleAllDropUpdates_pickersAreDropped() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment( - "edsServiceName1", + CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( @@ -496,7 +502,7 @@ public class EdsLoadBalancerTest { assertLatestSubchannelPicker(subchannel); clusterLoadAssignment = buildClusterLoadAssignment( - "edsServiceName1", + CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( @@ -519,7 +525,7 @@ public class EdsLoadBalancerTest { @Test public void handleLocalityAssignmentUpdates_pickersUpdatedFromChildBalancer() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); LbEndpoint endpoint11 = buildLbEndpoint("addr11.example.com", 8011, HEALTHY, 11); LbEndpoint endpoint12 = buildLbEndpoint("addr12.example.com", 8012, HEALTHY, 12); @@ -545,7 +551,7 @@ public class EdsLoadBalancerTest { 0); ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment( - "edsServiceName1", + CLUSTER_NAME, ImmutableList.of(localityLbEndpoints1, localityLbEndpoints2, localityLbEndpoints3), ImmutableList.of()); receiveEndpointUpdate(clusterLoadAssignment); @@ -606,7 +612,7 @@ public class EdsLoadBalancerTest { helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper, channelFactory); - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName1", null)); assertThat(localityStores).hasSize(1); LocalityStore localityStore = localityStores.peekLast(); @@ -643,7 +649,7 @@ public class EdsLoadBalancerTest { verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap()); // Change cluster name. - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName2", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName2", null)); assertThat(localityStores).hasSize(2); localityStore = localityStores.peekLast(); @@ -666,7 +672,7 @@ public class EdsLoadBalancerTest { @Test public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); verify(resourceUpdateCallback, never()).onError(); // Forwarding 20 seconds so that the xds client will deem EDS resource not available. @@ -677,10 +683,10 @@ public class EdsLoadBalancerTest { @Test public void verifyErrorPropagation_withPreviousEndpointUpdateReceived() { - deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null)); + deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null)); // Endpoint update received. ClusterLoadAssignment clusterLoadAssignment = - buildClusterLoadAssignment("edsServiceName1", + buildClusterLoadAssignment(CLUSTER_NAME, ImmutableList.of( buildLocalityLbEndpoints("region1", "zone1", "subzone1", ImmutableList.of( diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 09a726104e..06afd8567e 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.api.v2.core.Locality; import io.envoyproxy.envoy.api.v2.core.Node; @@ -53,7 +55,6 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.LoadReportClient.LoadReportCallback; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -61,6 +62,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -79,7 +81,17 @@ import org.mockito.MockitoAnnotations; */ @RunWith(JUnit4.class) public class LoadReportClientTest { - private static final Node NODE = Node.newBuilder().setId("LRS test").build(); + private static final String TARGET_NAME = "lrs-test.example.com"; + // bootstrap node identifier + private static final Node NODE = + Node.newBuilder() + .setId("LRS test") + .setMetadata( + Struct.newBuilder() + .putFields( + "TRAFFICDIRECTOR_NETWORK_HOSTNAME", + Value.newBuilder().setStringValue("default").build())) + .build(); private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = new FakeClock.TaskFilter() { @Override @@ -167,11 +179,14 @@ public class LoadReportClientTest { .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); lrsClient = new LoadReportClient( + TARGET_NAME, channel, - NODE, syncContext, + NODE, + syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, fakeClock.getStopwatchSupplier()); + lrsClient.startLoadReporting(callback); } @After @@ -182,22 +197,17 @@ public class LoadReportClientTest { @Test public void typicalWorkflow() { - String cluster1 = "cluster-foo.googleapis.com"; - String cluster2 = "cluster-bar.googleapis.com"; - ClusterStats rawStats1 = generateClusterLoadStats(cluster1); - ClusterStats rawStats2 = generateClusterLoadStats(cluster2); - when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1); - when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2); - lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1); - lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2); - lrsClient.startLoadReporting(callback); - verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); StreamObserver requestObserver = Iterables.getOnlyElement(lrsRequestObservers); InOrder inOrder = inOrder(requestObserver, callback); - inOrder.verify(requestObserver).onNext(eq(buildInitialRequest(cluster1, cluster2))); + inOrder.verify(requestObserver).onNext(eq(buildInitialRequest())); + + String cluster1 = "cluster-foo.googleapis.com"; + ClusterStats rawStats1 = generateClusterLoadStats(cluster1, null); + when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1); + lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1); // Management server asks to report loads for cluster1. responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000)); @@ -213,7 +223,13 @@ public class LoadReportClientTest { fakeClock.forwardNanos(1000); inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher)); - // Management server updates the interval of sending load reports. + String cluster2 = "cluster-bar.googleapis.com"; + ClusterStats rawStats2 = generateClusterLoadStats(cluster2, null); + when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2); + lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2); + + // Management server updates the interval of sending load reports, while still asking for + // loads to cluster1 only. responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000)); inOrder.verify(callback).onReportResponse(2000); @@ -226,7 +242,6 @@ public class LoadReportClientTest { // Management server asks to report loads for cluster1 and cluster2. responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000)); - inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) @@ -236,7 +251,6 @@ public class LoadReportClientTest { // Load reports for cluster1 is no longer wanted. responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000)); - inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) @@ -245,7 +259,6 @@ public class LoadReportClientTest { // Management server asks loads for a cluster that client has no load data. responseObserver .onNext(buildLrsResponse(ImmutableList.of("cluster-unknown.googleapis.com"), 2000)); - inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); ArgumentCaptor reportCaptor = ArgumentCaptor.forClass(null); @@ -257,12 +270,6 @@ public class LoadReportClientTest { @Test public void lrsStreamClosedAndRetried() { - String clusterName = "cluster-foo.googleapis.com"; - ClusterStats stats = generateClusterLoadStats(clusterName); - when(loadStatsStore1.generateLoadReport()).thenReturn(stats); - lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); - lrsClient.startLoadReporting(callback); - InOrder inOrder = inOrder(mockLoadReportingService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); @@ -270,8 +277,14 @@ public class LoadReportClientTest { assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); + String clusterName = "cluster-foo.googleapis.com"; + String clusterServiceName = "service-blade.googleapis.com"; + ClusterStats stats = generateClusterLoadStats(clusterName, clusterServiceName); + when(loadStatsStore1.generateLoadReport()).thenReturn(stats); + lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); + // First balancer RPC - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Balancer closes it immediately (erroneously) @@ -291,7 +304,7 @@ public class LoadReportClientTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Balancer closes it with an error. @@ -310,7 +323,7 @@ public class LoadReportClientTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Balancer sends a response asking for loads of the cluster. @@ -326,7 +339,7 @@ public class LoadReportClientTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); // Fail the retry after spending 4ns fakeClock.forwardNanos(4); @@ -344,7 +357,7 @@ public class LoadReportClientTest { inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Load reporting back to normal. @@ -363,19 +376,19 @@ public class LoadReportClientTest { @Test public void raceBetweenLoadReportingAndLbStreamClosure() { - String clusterName = "cluster-foo.googleapis.com"; - ClusterStats stats = generateClusterLoadStats(clusterName); - when(loadStatsStore1.generateLoadReport()).thenReturn(stats); - lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); - lrsClient.startLoadReporting(callback); - verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); + String clusterName = "cluster-foo.googleapis.com"; + String clusterServiceName = "service-blade.googleapis.com"; + ClusterStats stats = generateClusterLoadStats(clusterName, clusterServiceName); + when(loadStatsStore1.generateLoadReport()).thenReturn(stats); + lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); + // First balancer RPC - verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); + verify(requestObserver).onNext(eq(buildInitialRequest())); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Simulate receiving a response from traffic director. @@ -412,19 +425,28 @@ public class LoadReportClientTest { .build(); } - private static LoadStatsRequest buildInitialRequest(String... clusters) { - List clusterStatsList = new ArrayList<>(); - for (String cluster : clusters) { - clusterStatsList.add(ClusterStats.newBuilder().setClusterName(cluster).build()); - } + private static LoadStatsRequest buildInitialRequest() { return - LoadStatsRequest.newBuilder().setNode(NODE).addAllClusterStats(clusterStatsList).build(); + LoadStatsRequest.newBuilder() + .setNode( + Node.newBuilder() + .setId("LRS test") + .setMetadata( + Struct.newBuilder() + .putFields( + "TRAFFICDIRECTOR_NETWORK_HOSTNAME", + Value.newBuilder().setStringValue("default").build()) + .putFields( + LoadReportClient.TARGET_NAME_METADATA_KEY, + Value.newBuilder().setStringValue(TARGET_NAME).build()))) + .build(); } /** * Generates a raw service load stats report with random data. */ - private static ClusterStats generateClusterLoadStats(String clusterName) { + private static ClusterStats generateClusterLoadStats( + String clusterName, @Nullable String clusterServiceName) { long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); @@ -432,30 +454,32 @@ public class LoadReportClientTest { long numLbDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); - return - ClusterStats.newBuilder() - .setClusterName(clusterName) - .addUpstreamLocalityStats( - UpstreamLocalityStats.newBuilder() - .setLocality( - Locality.newBuilder() - .setRegion("region-foo") - .setZone("zone-bar") - .setSubZone("subzone-baz")) - .setTotalRequestsInProgress(callsInProgress) - .setTotalSuccessfulRequests(callsSucceeded) - .setTotalErrorRequests(callsFailed) - .setTotalIssuedRequests(callsIssued)) - .addDroppedRequests( - DroppedRequests.newBuilder() - .setCategory("lb") - .setDroppedCount(numLbDrops)) - .addDroppedRequests( - DroppedRequests.newBuilder() - .setCategory("throttle") - .setDroppedCount(numThrottleDrops)) - .setTotalDroppedRequests(numLbDrops + numThrottleDrops) - .build(); + ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder(); + clusterStatsBuilder.setClusterName(clusterName); + if (clusterServiceName != null) { + clusterStatsBuilder.setClusterServiceName(clusterServiceName); + } + clusterStatsBuilder.addUpstreamLocalityStats( + UpstreamLocalityStats.newBuilder() + .setLocality( + Locality.newBuilder() + .setRegion("region-foo") + .setZone("zone-bar") + .setSubZone("subzone-baz")) + .setTotalRequestsInProgress(callsInProgress) + .setTotalSuccessfulRequests(callsSucceeded) + .setTotalErrorRequests(callsFailed) + .setTotalIssuedRequests(callsIssued)) + .addDroppedRequests( + DroppedRequests.newBuilder() + .setCategory("lb") + .setDroppedCount(numLbDrops)) + .addDroppedRequests( + DroppedRequests.newBuilder() + .setCategory("throttle") + .setDroppedCount(numThrottleDrops)) + .setTotalDroppedRequests(numLbDrops + numThrottleDrops); + return clusterStatsBuilder.build(); } /** @@ -476,6 +500,11 @@ public class LoadReportClientTest { @Override public boolean matches(LoadStatsRequest argument) { + if (!argument.getNode().getMetadata() + .getFieldsOrThrow(LoadReportClient.TARGET_NAME_METADATA_KEY) + .getStringValue().equals(TARGET_NAME)) { + return false; + } if (argument.getClusterStatsCount() != expectedStats.size()) { return false; } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 907f57f467..e2b75e01c4 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -45,6 +45,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; +import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.DiscoveryRequest; @@ -122,6 +123,7 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class XdsClientImplTest { + private static final String TARGET_NAME = "foo.googleapis.com:8080"; private static final String HOSTNAME = "foo.googleapis.com"; private static final int PORT = 8080; @@ -250,7 +252,6 @@ public class XdsClientImplTest { new CancellationListener() { @Override public void cancelled(Context context) { - call.cancelled = true; lrsEnded.set(true); } }, MoreExecutors.directExecutor()); @@ -282,8 +283,14 @@ public class XdsClientImplTest { }; xdsClient = - new XdsClientImpl(servers, channelFactory, NODE, syncContext, - fakeClock.getScheduledExecutorService(), backoffPolicyProvider, + new XdsClientImpl( + TARGET_NAME, + servers, + channelFactory, + NODE, + syncContext, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, fakeClock.getStopwatchSupplier()); // Only the connection to management server is established, no RPC request is sent until at // least one watcher is registered. @@ -1266,10 +1273,9 @@ public class XdsClientImplTest { verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate.getEdsServiceName()).isNull(); assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate.getLrsServerName()).isNull(); // Management server sends back another CDS response updating the requested Cluster. clusters = ImmutableList.of( @@ -1292,8 +1298,7 @@ public class XdsClientImplTest { assertThat(clusterUpdate.getEdsServiceName()) .isEqualTo("eds-cluster-foo.googleapis.com"); assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.isEnableLrs()).isTrue(); - assertThat(clusterUpdate.getLrsServerName()).isEmpty(); + assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); } /** @@ -1371,20 +1376,18 @@ public class XdsClientImplTest { ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getEdsServiceName()).isNull(); assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate1.getLrsServerName()).isNull(); ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getEdsServiceName()).isNull(); assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate2.getLrsServerName()).isNull(); verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class)); verify(watcher3, never()).onError(any(Status.class)); @@ -1421,20 +1424,18 @@ public class XdsClientImplTest { clusterUpdate1 = clusterUpdateCaptor1.getValue(); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getEdsServiceName()).isNull(); assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate1.getLrsServerName()).isNull(); clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture()); clusterUpdate2 = clusterUpdateCaptor2.getValue(); assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getEdsServiceName()).isNull(); assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate2.getLrsServerName()).isNull(); ArgumentCaptor clusterUpdateCaptor3 = ArgumentCaptor.forClass(null); verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); @@ -1443,8 +1444,7 @@ public class XdsClientImplTest { assertThat(clusterUpdate3.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); - assertThat(clusterUpdate3.getLrsServerName()).isEmpty(); + assertThat(clusterUpdate3.getLrsServerName()).isEqualTo(""); } /** @@ -1484,10 +1484,9 @@ public class XdsClientImplTest { verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getEdsServiceName()).isNull(); assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate1.getLrsServerName()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // Another cluster watcher interested in the same cluster is added. @@ -1500,10 +1499,9 @@ public class XdsClientImplTest { verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue(); assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate2.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate2.getEdsServiceName()).isNull(); assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate2.getLrsServerName()).isNull(); verifyNoMoreInteractions(requestObserver); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); @@ -1543,10 +1541,9 @@ public class XdsClientImplTest { verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture()); ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue(); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getEdsServiceName()).isNull(); assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate1.getLrsServerName()).isNull(); // Add another cluster watcher for a different cluster. ClusterWatcher watcher2 = mock(ClusterWatcher.class); @@ -1581,10 +1578,9 @@ public class XdsClientImplTest { verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture()); clusterUpdate1 = clusterUpdateCaptor1.getValue(); assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate1.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate1.getEdsServiceName()).isNull(); assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false); + assertThat(clusterUpdate1.getLrsServerName()).isNull(); ArgumentCaptor clusterUpdateCaptor2 = ArgumentCaptor.forClass(null); verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture()); @@ -1593,8 +1589,7 @@ public class XdsClientImplTest { assertThat(clusterUpdate2.getEdsServiceName()) .isEqualTo("eds-cluster-bar.googleapis.com"); assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true); - assertThat(clusterUpdate2.getLrsServerName()).isEmpty(); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); // Cancel one of the watcher. xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1); @@ -1660,11 +1655,9 @@ public class XdsClientImplTest { verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture()); ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue(); assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate3.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate3.getEdsServiceName()).isNull(); assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true); - assertThat(clusterUpdate2.getLrsServerName()).isEmpty(); + assertThat(clusterUpdate2.getLrsServerName()).isEqualTo(""); verifyNoMoreInteractions(watcher1, watcher2); @@ -1768,11 +1761,9 @@ public class XdsClientImplTest { verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture()); ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue(); assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com"); - assertThat(clusterUpdate.getEdsServiceName()) - .isEqualTo("cluster-foo.googleapis.com"); // default to cluster name + assertThat(clusterUpdate.getEdsServiceName()).isNull(); assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin"); - assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true); - assertThat(clusterUpdate.getLrsServerName()).isEmpty(); + assertThat(clusterUpdate.getLrsServerName()).isEqualTo(""); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); // No cluster is available. @@ -3097,14 +3088,31 @@ public class XdsClientImplTest { */ @Test public void reportLoadStatsToServer() { - LoadStatsStore loadStatsStore = mock(LoadStatsStore.class); String clusterName = "cluster-foo.googleapis.com"; + LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(clusterName, null); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(null); xdsClient.reportClientStats(clusterName, null, loadStatsStore); LoadReportCall lrsCall = loadReportCalls.poll(); - verify(lrsCall.requestObserver).onNext(eq(buildInitialLoadStatsRequest(clusterName))); + verify(lrsCall.requestObserver).onNext(requestCaptor.capture()); + assertThat(requestCaptor.getValue().getClusterStatsCount()) + .isEqualTo(0); // initial request + + lrsCall.responseObserver.onNext( + LoadStatsResponse.newBuilder() + .addClusters(clusterName) + .setLoadReportingInterval(Durations.fromNanos(1000L)) + .build()); + fakeClock.forwardNanos(1000L); + verify(lrsCall.requestObserver, times(2)).onNext(requestCaptor.capture()); + ClusterStats report = Iterables.getOnlyElement(requestCaptor.getValue().getClusterStatsList()); + assertThat(report.getClusterName()).isEqualTo(clusterName); xdsClient.cancelClientStatsReport(clusterName, null); - assertThat(lrsCall.cancelled).isTrue(); + fakeClock.forwardNanos(1000L); + verify(lrsCall.requestObserver, times(3)).onNext(requestCaptor.capture()); + assertThat(requestCaptor.getValue().getClusterStatsCount()) + .isEqualTo(0); // no more stats reported + // See more test on LoadReportClientTest.java } @@ -3536,14 +3544,6 @@ public class XdsClientImplTest { assertThat(res).isEqualTo(expectedString); } - private static LoadStatsRequest buildInitialLoadStatsRequest(String clusterName) { - return - LoadStatsRequest.newBuilder() - .setNode(NODE) - .addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName)) - .build(); - } - /** * Matcher for DiscoveryRequest without the comparison of error_details field, which is used for * management server debugging purposes. @@ -3593,7 +3593,6 @@ public class XdsClientImplTest { private final StreamObserver requestObserver; @SuppressWarnings("unused") private final StreamObserver responseObserver; - private boolean cancelled; LoadReportCall(StreamObserver requestObserver, StreamObserver responseObserver) { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientTest.java b/xds/src/test/java/io/grpc/xds/XdsClientTest.java index 70cc3ef253..56fca6dc09 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientTest.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import io.grpc.xds.XdsClient.ClusterUpdate; import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool; import io.grpc.xds.XdsClient.XdsClientFactory; import org.junit.Rule; @@ -38,24 +37,6 @@ public class XdsClientTest { @Rule public final ExpectedException thrown = ExpectedException.none(); - @Test - public void buildClusterUpdate_defaultToClusterNameWhenEdsServiceNameNotSet() { - ClusterUpdate clusterUpdate1 = - ClusterUpdate.newBuilder() - .setClusterName("foo.googleapis.com") - .setEdsServiceName("bar.googleapis.com") - .setLbPolicy("round_robin") - .build(); - assertThat(clusterUpdate1.getEdsServiceName()).isEqualTo("bar.googleapis.com"); - - ClusterUpdate clusterUpdate2 = - ClusterUpdate.newBuilder() - .setClusterName("foo.googleapis.com") - .setLbPolicy("round_robin") - .build(); - assertThat(clusterUpdate2.getEdsServiceName()).isEqualTo("foo.googleapis.com"); - } - @Test public void refCountedXdsClientObjectPool_getObjectShouldMatchReturnObject() { XdsClientFactory xdsClientFactory = new XdsClientFactory() { diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java index 2e83b9e583..e4e8c1987b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerProviderTest.java @@ -155,6 +155,7 @@ public class XdsLoadBalancerProviderTest { @Test public void parseLoadBalancingConfigPolicy() throws Exception { String rawLbConfig = "{" + + "\"cluster\" : \"foo.googleapis.com\"," + "\"childPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"supported_1\" : {}}]," + "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"round_robin\" : {\"key\" : \"val\"}}," + "{\"supported_2\" : {\"key\" : \"val\"}}]," @@ -169,6 +170,7 @@ public class XdsLoadBalancerProviderTest { assertThat(configOrError.getConfig()).isInstanceOf(XdsConfig.class); assertThat(configOrError.getConfig()).isEqualTo( new XdsConfig( + "foo.googleapis.com", ServiceConfigUtil.unwrapLoadBalancingConfig( checkObject(JsonParser.parse("{\"supported_1\" : {}}"))), ServiceConfigUtil.unwrapLoadBalancingConfig(