xds: update LRS protocol and fix usage of cluster name in ClusterStats (#6737)

Fixes load reporting integration due to LRS design flaws.

- Updated LRS protocol. The Node sent in LRS requests use a special metadata "PROXYLESS_CLIENT_HOSTNAME" with value being the hostname (including port) for creating the gRPC channel. Management server is able to infer clusters that the gRPC client potentially sends load to. LRS initial request does not need to populate clusters it wants to report load for.

- Each ClusterStats message in LRS requests represents the loads for each (cluster, cluster_service), where cluster_service field is optional. EDS LB policy should track loads per (cluster, cluster_service) and populate cluster name from upstream CDS policy.

- Modified CdsUpdate, which is the converted data of a CDS response. edsServiceName field can be null when an CDS response does not give it. We want to preserve the null value for LRS requests.
This commit is contained in:
Chengyuan Zhang 2020-02-21 15:32:27 -08:00 committed by GitHub
parent abed707385
commit a98db126e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 339 additions and 319 deletions

View File

@ -280,11 +280,13 @@ public final class CdsLoadBalancer extends LoadBalancer {
newUpdate.getLbPolicy().equals("round_robin"),
"The load balancing policy in ClusterUpdate '%s' is not supported", newUpdate);
final XdsConfig edsConfig = new XdsConfig(
new LbConfig(newUpdate.getLbPolicy(), ImmutableMap.<String, Object>of()),
/* fallbackPolicy = */ null,
/* edsServiceName = */ newUpdate.getEdsServiceName(),
/* lrsServerName = */ newUpdate.getLrsServerName());
final XdsConfig edsConfig =
new XdsConfig(
/* cluster = */ newUpdate.getClusterName(),
new LbConfig(newUpdate.getLbPolicy(), ImmutableMap.<String, Object>of()),
/* fallbackPolicy = */ null,
/* edsServiceName = */ newUpdate.getEdsServiceName(),
/* lrsServerName = */ newUpdate.getLrsServerName());
updateSslContextProvider(newUpdate.getUpstreamTlsContext());
if (edsBalancer == null) {
edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(helper);

View File

@ -72,9 +72,6 @@ final class EdsLoadBalancer extends LoadBalancer {
private XdsClient xdsClient;
@Nullable
private String clusterName;
// FIXME(chengyuanzhang): should be one instance per cluster:cluster_service.
@Nullable
private LoadStatsStore loadStatsStore;
EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) {
this(
@ -159,6 +156,7 @@ final class EdsLoadBalancer extends LoadBalancer {
XdsClient createXdsClient() {
return
new XdsClientImpl(
edsLbHelper.getAuthority(),
serverList,
channelFactory,
node,
@ -173,41 +171,15 @@ final class EdsLoadBalancer extends LoadBalancer {
xdsClient = xdsClientPool.getObject();
}
// The edsServiceName field is null in legacy gRPC client with EDS: use target authority for
// querying endpoints, but in the future we expect this to be explicitly given by EDS config.
// We assume if edsServiceName is null, it will always be null in later resolver updates;
// and if edsServiceName is not null, it will always be not null.
String clusterServiceName = newXdsConfig.edsServiceName;
if (clusterServiceName == null) {
clusterServiceName = edsLbHelper.getAuthority();
}
if (clusterName == null) {
// TODO(zdapeng): Use the correct cluster name. Currently load reporting will be broken if
// edsServiceName is changed because we are using edsServiceName for the cluster name.
clusterName = clusterServiceName;
loadStatsStore = new LoadStatsStoreImpl(clusterName, null);
}
// FIXME(chengyuanzhang): should report loads for each cluster:cluster_service.
if (xdsConfig == null
|| !Objects.equals(newXdsConfig.lrsServerName, xdsConfig.lrsServerName)) {
if (newXdsConfig.lrsServerName != null) {
if (!newXdsConfig.lrsServerName.equals("")) {
throw new AssertionError(
"Can only report load to the same management server");
}
xdsClient.reportClientStats(clusterName, null, loadStatsStore);
} else if (xdsConfig != null) {
xdsClient.cancelClientStatsReport(clusterName, null);
}
}
// FIXME(chengyuanzhang): make cluster name required in XdsConfig.
clusterName = newXdsConfig.cluster != null ? newXdsConfig.cluster : edsLbHelper.getAuthority();
// Note: childPolicy change will be handled in LocalityStore, to be implemented.
// If edsServiceName in XdsConfig is changed, do a graceful switch.
if (xdsConfig == null
|| !Objects.equals(newXdsConfig.edsServiceName, xdsConfig.edsServiceName)) {
LoadBalancer.Factory clusterEndpointsLoadBalancerFactory =
new ClusterEndpointsBalancerFactory(clusterServiceName);
new ClusterEndpointsBalancerFactory(newXdsConfig.edsServiceName);
switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancerFactory);
}
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
@ -232,9 +204,6 @@ final class EdsLoadBalancer extends LoadBalancer {
channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down");
switchingLoadBalancer.shutdown();
if (xdsClient != null) {
if (xdsConfig != null && xdsConfig.lrsServerName != null) {
xdsClient.cancelClientStatsReport(clusterName, null);
}
xdsClient = xdsClientPool.returnObject(xdsClient);
}
}
@ -243,10 +212,12 @@ final class EdsLoadBalancer extends LoadBalancer {
* A load balancer factory that provides a load balancer for a given cluster service.
*/
private final class ClusterEndpointsBalancerFactory extends LoadBalancer.Factory {
final String clusterServiceName;
@Nullable final String clusterServiceName;
final LoadStatsStore loadStatsStore;
ClusterEndpointsBalancerFactory(String clusterServiceName) {
ClusterEndpointsBalancerFactory(@Nullable String clusterServiceName) {
this.clusterServiceName = clusterServiceName;
loadStatsStore = new LoadStatsStoreImpl(clusterName, clusterServiceName);
}
@Override
@ -260,7 +231,7 @@ final class EdsLoadBalancer extends LoadBalancer {
return false;
}
ClusterEndpointsBalancerFactory that = (ClusterEndpointsBalancerFactory) o;
return clusterServiceName.equals(that.clusterServiceName);
return Objects.equals(clusterServiceName, that.clusterServiceName);
}
@Override
@ -272,20 +243,41 @@ final class EdsLoadBalancer extends LoadBalancer {
* Load-balances endpoints for a given cluster.
*/
final class ClusterEndpointsBalancer extends LoadBalancer {
// Name of the resource to be used for querying endpoint information.
final String resourceName;
final Helper helper;
final EndpointWatcherImpl endpointWatcher;
final LocalityStore localityStore;
boolean isReportingLoad;
ClusterEndpointsBalancer(Helper helper) {
this.helper = helper;
resourceName = clusterServiceName != null ? clusterServiceName : clusterName;
localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore);
endpointWatcher = new EndpointWatcherImpl(localityStore);
xdsClient.watchEndpointData(clusterServiceName, endpointWatcher);
xdsClient.watchEndpointData(resourceName, endpointWatcher);
}
// TODO(zddapeng): In handleResolvedAddresses() handle child policy change if any.
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
XdsConfig config = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config.lrsServerName != null) {
if (!config.lrsServerName.equals("")) {
throw new AssertionError(
"Can only report load to the same management server");
}
if (!isReportingLoad) {
xdsClient.reportClientStats(clusterName, clusterServiceName, loadStatsStore);
isReportingLoad = true;
}
} else {
if (isReportingLoad) {
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
isReportingLoad = false;
}
}
// TODO(zddapeng): In handleResolvedAddresses() handle child policy change if any.
}
@Override
public void handleNameResolutionError(Status error) {
@ -303,8 +295,12 @@ final class EdsLoadBalancer extends LoadBalancer {
@Override
public void shutdown() {
if (isReportingLoad) {
xdsClient.cancelClientStatsReport(clusterName, clusterServiceName);
isReportingLoad = false;
}
localityStore.reset();
xdsClient.cancelEndpointDataWatch(clusterServiceName, endpointWatcher);
xdsClient.cancelEndpointDataWatch(resourceName, endpointWatcher);
}
}
}

View File

@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
@ -35,10 +37,8 @@ import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@ -56,6 +56,9 @@ import javax.annotation.concurrent.NotThreadSafe;
final class LoadReportClient {
private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName());
@VisibleForTesting
static final String TARGET_NAME_METADATA_KEY = "PROXYLESS_CLIENT_HOSTNAME";
private final ManagedChannel channel;
private final Node node;
private final SynchronizationContext syncContext;
@ -64,10 +67,8 @@ final class LoadReportClient {
private final Stopwatch retryStopwatch;
private final BackoffPolicy.Provider backoffPolicyProvider;
// Sources of load stats data for each cluster.
// FIXME(chengyuanzhang): this should be Map<String, Map<String, LoadStatsStore>> as each
// ClusterStats is keyed by cluster:cluster_service. Currently, cluster_service is always unset.
private final Map<String, LoadStatsStore> loadStatsStoreMap = new HashMap<>();
// Sources of load stats data for each cluster:cluster_service.
private final Map<String, Map<String, LoadStatsStore>> loadStatsStoreMap = new HashMap<>();
private boolean started;
@Nullable
@ -80,6 +81,7 @@ final class LoadReportClient {
private LoadReportCallback callback;
LoadReportClient(
String targetName,
ManagedChannel channel,
Node node,
SynchronizationContext syncContext,
@ -87,13 +89,21 @@ final class LoadReportClient {
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.channel = checkNotNull(channel, "channel");
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.retryStopwatch = stopwatchSupplier.get();
started = false;
checkNotNull(targetName, "targetName");
checkNotNull(node, "node");
Struct metadata =
node.getMetadata()
.toBuilder()
.putFields(
TARGET_NAME_METADATA_KEY,
Value.newBuilder().setStringValue(targetName).build())
.build();
this.node = node.toBuilder().setMetadata(metadata).build();
}
/**
@ -101,7 +111,7 @@ final class LoadReportClient {
* stats periodically. Calling this method on an already started {@link LoadReportClient} is
* no-op.
*/
public void startLoadReporting(LoadReportCallback callback) {
void startLoadReporting(LoadReportCallback callback) {
if (started) {
return;
}
@ -114,7 +124,7 @@ final class LoadReportClient {
* Terminates load reporting. Calling this method on an already stopped
* {@link LoadReportClient} is no-op.
*/
public void stopLoadReporting() {
void stopLoadReporting() {
if (!started) {
return;
}
@ -132,37 +142,35 @@ final class LoadReportClient {
* Provides this LoadReportClient source of load stats data for the given
* cluster:cluster_service. If requested, data from the given loadStatsStore is
* periodically queried and sent to traffic director by this LoadReportClient.
*
* <p>Currently we expect load stats data for all clusters to report loads for are provided
* before load reporting starts (so that LRS initial request tells management server clusters
* it is reporting loads for). Design TBD for reporting loads for extra clusters after load
* reporting has started.
*
* <p>Note: currently clusterServiceName is always unset.
*/
public void addLoadStatsStore(
void addLoadStatsStore(
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
checkState(
!loadStatsStoreMap.containsKey(clusterName),
"load stats for cluster " + clusterName + " already exists");
// FIXME(chengyuanzhang): relax this restriction after design is fleshed out.
checkState(
!started,
"load stats for all clusters to report loads for should be provided before "
+ "load reporting has started");
loadStatsStoreMap.put(clusterName, loadStatsStore);
!loadStatsStoreMap.containsKey(clusterName)
|| !loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s already exists",
clusterName, clusterServiceName);
if (!loadStatsStoreMap.containsKey(clusterName)) {
loadStatsStoreMap.put(clusterName, new HashMap<String, LoadStatsStore>());
}
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.put(clusterServiceName, loadStatsStore);
}
/**
* Stops providing load stats data for the given cluster:cluster_service.
*
* <p>Note: currently clusterServiceName is always unset.
*/
public void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) {
void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) {
checkState(
loadStatsStoreMap.containsKey(clusterName),
"load stats for cluster " + clusterName + " does not exist");
loadStatsStoreMap.remove(clusterName);
loadStatsStoreMap.containsKey(clusterName)
&& loadStatsStoreMap.get(clusterName).containsKey(clusterServiceName),
"load stats for cluster: %s, cluster service: %s does not exist",
clusterName, clusterServiceName);
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(clusterName);
clusterLoadStatsStores.remove(clusterServiceName);
if (clusterLoadStatsStores.isEmpty()) {
loadStatsStoreMap.remove(clusterName);
}
}
@VisibleForTesting
@ -217,15 +225,12 @@ final class LoadReportClient {
void start() {
lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this);
reportStopwatch.reset().start();
// Tells management server which clusters the client is reporting loads for.
List<ClusterStats> clusterStatsList = new ArrayList<>();
for (String clusterName : loadStatsStoreMap.keySet()) {
clusterStatsList.add(ClusterStats.newBuilder().setClusterName(clusterName).build());
}
// Send an initial LRS request with empty cluster stats. Management server is able to
// infer clusters the gRPC client sending loads to.
LoadStatsRequest initRequest =
LoadStatsRequest.newBuilder()
.setNode(node)
.addAllClusterStats(clusterStatsList)
.build();
lrsRequestWriter.onNext(initRequest);
logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest);
@ -269,13 +274,15 @@ final class LoadReportClient {
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
for (String name : clusterNames) {
if (loadStatsStoreMap.containsKey(name)) {
LoadStatsStore loadStatsStore = loadStatsStoreMap.get(name);
ClusterStats report =
loadStatsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
Map<String, LoadStatsStore> clusterLoadStatsStores = loadStatsStoreMap.get(name);
for (LoadStatsStore statsStore : clusterLoadStatsStores.values()) {
ClusterStats report =
statsStore.generateLoadReport()
.toBuilder()
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
}
}
}
LoadStatsRequest request = requestBuilder.build();
@ -308,10 +315,16 @@ final class LoadReportClient {
} else {
logger.log(Level.FINE, "Received an LRS response: {0}", response);
}
loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval());
callback.onReportResponse(loadReportIntervalNano);
clusterNames.clear();
clusterNames.addAll(response.getClustersList());
long interval = Durations.toNanos(response.getLoadReportingInterval());
if (interval != loadReportIntervalNano) {
loadReportIntervalNano = interval;
callback.onReportResponse(loadReportIntervalNano);
}
if (clusterNames.size() != response.getClustersCount()
|| !clusterNames.containsAll(response.getClustersList())) {
clusterNames.clear();
clusterNames.addAll(response.getClustersList());
}
scheduleNextLoadReport();
}

View File

@ -112,19 +112,22 @@ abstract class XdsClient {
*/
static final class ClusterUpdate {
private final String clusterName;
@Nullable
private final String edsServiceName;
private final String lbPolicy;
private final boolean enableLrs;
@Nullable
private final String lrsServerName;
private final UpstreamTlsContext upstreamTlsContext;
private ClusterUpdate(String clusterName, String edsServiceName, String lbPolicy,
boolean enableLrs, @Nullable String lrsServerName,
private ClusterUpdate(
String clusterName,
@Nullable String edsServiceName,
String lbPolicy,
@Nullable String lrsServerName,
@Nullable UpstreamTlsContext upstreamTlsContext) {
this.clusterName = clusterName;
this.edsServiceName = edsServiceName;
this.lbPolicy = lbPolicy;
this.enableLrs = enableLrs;
this.lrsServerName = lrsServerName;
this.upstreamTlsContext = upstreamTlsContext;
}
@ -136,6 +139,7 @@ abstract class XdsClient {
/**
* Returns the resource name for EDS requests.
*/
@Nullable
String getEdsServiceName() {
return edsServiceName;
}
@ -148,16 +152,9 @@ abstract class XdsClient {
return lbPolicy;
}
/**
* Returns true if LRS is enabled.
*/
boolean isEnableLrs() {
return enableLrs;
}
/**
* Returns the server name to send client load reports to if LRS is enabled. {@code null} if
* {@link #isEnableLrs()} returns {@code false}.
* load reporting is disabled for this cluster.
*/
@Nullable
String getLrsServerName() {
@ -176,9 +173,9 @@ abstract class XdsClient {
static final class Builder {
private String clusterName;
@Nullable
private String edsServiceName;
private String lbPolicy;
private boolean enableLrs;
@Nullable
private String lrsServerName;
@Nullable
@ -203,11 +200,6 @@ abstract class XdsClient {
return this;
}
Builder setEnableLrs(boolean enableLrs) {
this.enableLrs = enableLrs;
return this;
}
Builder setLrsServerName(String lrsServerName) {
this.lrsServerName = lrsServerName;
return this;
@ -221,13 +213,10 @@ abstract class XdsClient {
ClusterUpdate build() {
Preconditions.checkState(clusterName != null, "clusterName is not set");
Preconditions.checkState(lbPolicy != null, "lbPolicy is not set");
Preconditions.checkState(
(enableLrs && lrsServerName != null) || (!enableLrs && lrsServerName == null),
"lrsServerName is not set while LRS is enabled "
+ "OR lrsServerName is set while LRS is not enabled");
return
new ClusterUpdate(clusterName, edsServiceName == null ? clusterName : edsServiceName,
lbPolicy, enableLrs, lrsServerName, upstreamTlsContext);
new ClusterUpdate(
clusterName, edsServiceName, lbPolicy, lrsServerName, upstreamTlsContext);
}
}
}

View File

@ -88,6 +88,8 @@ final class XdsClientImpl extends XdsClient {
private final MessagePrinter respPrinter = new MessagePrinter();
// Name of the target server this gRPC client is trying to talk to.
private final String targetName;
private final ManagedChannel channel;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
@ -162,6 +164,7 @@ final class XdsClientImpl extends XdsClient {
private String ldsResourceName;
XdsClientImpl(
String targetName,
List<ServerInfo> servers, // list of management servers
XdsChannelFactory channelFactory,
Node node,
@ -169,6 +172,7 @@ final class XdsClientImpl extends XdsClient {
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.targetName = checkNotNull(targetName, "targetName");
this.channel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
@ -407,29 +411,30 @@ final class XdsClientImpl extends XdsClient {
@Override
void reportClientStats(
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
checkState(lrsClient == null,
"load reporting has already started, cannot change clusters to report loads for");
lrsClient =
new LoadReportClient(
channel,
node,
syncContext,
timeService,
backoffPolicyProvider,
stopwatchSupplier);
if (lrsClient == null) {
lrsClient =
new LoadReportClient(
targetName,
channel,
node,
syncContext,
timeService,
backoffPolicyProvider,
stopwatchSupplier);
lrsClient.startLoadReporting(new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {}
});
}
lrsClient.addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore);
lrsClient.startLoadReporting(new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {}
});
}
@Override
void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) {
checkState(lrsClient != null, "load reporting was never started");
lrsClient.removeLoadStatsStore(clusterName, clusterServiceName);
lrsClient.stopLoadReporting();
lrsClient = null;
// TODO(chengyuanzhang): can be optimized to stop load reporting if no more loads need
// to be reported.
}
/**
@ -740,8 +745,7 @@ final class XdsClientImpl extends XdsClient {
+ "indicate to use EDS over ADS.";
break;
}
// If the service_name field is set, that value will be used for the EDS request
// instead of the cluster name (default).
// If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) {
updateBuilder.setEdsServiceName(edsClusterConfig.getServiceName());
edsServices.add(edsClusterConfig.getServiceName());
@ -764,10 +768,7 @@ final class XdsClientImpl extends XdsClient {
+ "management server.";
break;
}
updateBuilder.setEnableLrs(true);
updateBuilder.setLrsServerName("");
} else {
updateBuilder.setEnableLrs(false);
}
if (cluster.hasTlsContext()) {
updateBuilder.setUpstreamTlsContext(cluster.getTlsContext());

View File

@ -77,13 +77,14 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
static ConfigOrError parseLoadBalancingConfigPolicy(
Map<String, ?> rawLoadBalancingPolicyConfig, LoadBalancerRegistry registry) {
try {
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
LbConfig childPolicy = selectChildPolicy(rawLoadBalancingPolicyConfig, registry);
LbConfig fallbackPolicy = selectFallbackPolicy(rawLoadBalancingPolicyConfig, registry);
String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName");
String lrsServerName =
JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName");
return ConfigOrError.fromConfig(
new XdsConfig(childPolicy, fallbackPolicy, edsServiceName, lrsServerName));
new XdsConfig(cluster, childPolicy, fallbackPolicy, edsServiceName, lrsServerName));
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.fromThrowable(e).withDescription(
@ -128,6 +129,9 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
* Represents a successfully parsed and validated LoadBalancingConfig for XDS.
*/
static final class XdsConfig {
// FIXME(chengyuanzhang): make cluster name required.
@Nullable
final String cluster;
// TODO(carl-mastrangelo): make these Object's containing the fully parsed child configs.
@Nullable
final LbConfig childPolicy;
@ -144,10 +148,12 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
final String lrsServerName;
XdsConfig(
@Nullable String cluster,
@Nullable LbConfig childPolicy,
@Nullable LbConfig fallbackPolicy,
@Nullable String edsServiceName,
@Nullable String lrsServerName) {
this.cluster = cluster;
this.childPolicy = childPolicy;
this.fallbackPolicy = fallbackPolicy;
this.edsServiceName = edsServiceName;
@ -157,6 +163,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("cluster", cluster)
.add("childPolicy", childPolicy)
.add("fallbackPolicy", fallbackPolicy)
.add("edsServiceName", edsServiceName)
@ -170,7 +177,8 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
return false;
}
XdsConfig that = (XdsConfig) obj;
return Objects.equal(this.childPolicy, that.childPolicy)
return Objects.equal(this.cluster, that.cluster)
&& Objects.equal(this.childPolicy, that.childPolicy)
&& Objects.equal(this.fallbackPolicy, that.fallbackPolicy)
&& Objects.equal(this.edsServiceName, that.edsServiceName)
&& Objects.equal(this.lrsServerName, that.lrsServerName);
@ -178,7 +186,7 @@ public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
@Override
public int hashCode() {
return Objects.hashCode(childPolicy, fallbackPolicy, edsServiceName, lrsServerName);
return Objects.hashCode(cluster, childPolicy, fallbackPolicy, edsServiceName, lrsServerName);
}
}
}

View File

@ -123,6 +123,7 @@ final class XdsNameResolver extends NameResolver {
XdsClient createXdsClient() {
return
new XdsClientImpl(
authority,
serverList,
channelFactory,
node,

View File

@ -198,7 +198,6 @@ public class CdsLoadBalancerTest {
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.build());
verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
@ -230,7 +229,6 @@ public class CdsLoadBalancerTest {
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.build());
assertThat(edsLbHelpers).hasSize(1);
@ -240,6 +238,7 @@ public class CdsLoadBalancerTest {
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor1 = ArgumentCaptor.forClass(null);
verify(edsLoadBalancer1).handleResolvedAddresses(resolvedAddressesCaptor1.capture());
XdsConfig expectedXdsConfig = new XdsConfig(
"foo.googleapis.com",
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
null,
"edsServiceFoo.googleapis.com",
@ -271,7 +270,6 @@ public class CdsLoadBalancerTest {
.setClusterName("bar.googleapis.com")
.setEdsServiceName("edsServiceBar.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(true)
.setLrsServerName("lrsBar.googleapis.com")
.build());
@ -282,6 +280,7 @@ public class CdsLoadBalancerTest {
ArgumentCaptor<ResolvedAddresses> resolvedAddressesCaptor2 = ArgumentCaptor.forClass(null);
verify(edsLoadBalancer2).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
expectedXdsConfig = new XdsConfig(
"bar.googleapis.com",
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
null,
"edsServiceBar.googleapis.com",
@ -307,10 +306,10 @@ public class CdsLoadBalancerTest {
.setClusterName("bar.googleapis.com")
.setEdsServiceName("edsServiceBar2.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.build());
verify(edsLoadBalancer2, times(2)).handleResolvedAddresses(resolvedAddressesCaptor2.capture());
expectedXdsConfig = new XdsConfig(
"bar.googleapis.com",
new LbConfig("round_robin", ImmutableMap.<String, Object>of()),
null,
"edsServiceBar2.googleapis.com",
@ -358,7 +357,6 @@ public class CdsLoadBalancerTest {
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.setUpstreamTlsContext(upstreamTlsContext)
.build());
@ -391,7 +389,6 @@ public class CdsLoadBalancerTest {
.setClusterName("bar.googleapis.com")
.setEdsServiceName("eds1ServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.setUpstreamTlsContext(upstreamTlsContext)
.build());
@ -416,7 +413,6 @@ public class CdsLoadBalancerTest {
.setClusterName("bar.googleapis.com")
.setEdsServiceName("eds1ServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.setUpstreamTlsContext(upstreamTlsContext1)
.build());
@ -436,7 +432,6 @@ public class CdsLoadBalancerTest {
.setClusterName("bar.googleapis.com")
.setEdsServiceName("eds1ServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.setUpstreamTlsContext(null)
.build());
verify(mockTlsContextManager).releaseClientSslContextProvider(same(mockSslContextProvider1));
@ -501,7 +496,6 @@ public class CdsLoadBalancerTest {
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.build());
assertThat(edsLbHelpers).hasSize(1);
@ -542,7 +536,6 @@ public class CdsLoadBalancerTest {
.setClusterName("foo.googleapis.com")
.setEdsServiceName("edsServiceFoo.googleapis.com")
.setLbPolicy("round_robin")
.setEnableLrs(false)
.build());
ArgumentCaptor<EndpointWatcher> endpointWatcherCaptor = ArgumentCaptor.forClass(null);

View File

@ -108,6 +108,7 @@ import org.mockito.junit.MockitoRule;
@RunWith(Parameterized.class)
public class EdsLoadBalancerTest {
private static final String CLUSTER_NAME = "eds-lb-test.example.com";
private static final String SERVICE_AUTHORITY = "test.authority.example.com";
@Rule
@ -234,8 +235,13 @@ public class EdsLoadBalancerTest {
if (isFullFlow) {
xdsClientPoolFromResolveAddresses = new FakeXdsClientPool(
new XdsClientImpl(
serverList, channelFactory, Node.getDefaultInstance(), syncContext,
fakeClock.getScheduledExecutorService(), mock(BackoffPolicy.Provider.class),
SERVICE_AUTHORITY,
serverList,
channelFactory,
Node.getDefaultInstance(),
syncContext,
fakeClock.getScheduledExecutorService(),
mock(BackoffPolicy.Provider.class),
fakeClock.getStopwatchSupplier()));
}
@ -265,7 +271,7 @@ public class EdsLoadBalancerTest {
@Test
public void handleNameResolutionErrorBeforeAndAfterEdsWorkding() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
// handleResolutionError() before receiving any endpoint update.
edsLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
@ -273,7 +279,7 @@ public class EdsLoadBalancerTest {
// Endpoint update received.
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
@ -293,7 +299,7 @@ public class EdsLoadBalancerTest {
public void handleEdsServiceNameChangeInXdsConfig() {
assertThat(childHelpers).isEmpty();
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName1", null));
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
ImmutableList.of(
@ -313,7 +319,7 @@ public class EdsLoadBalancerTest {
assertLatestConnectivityState(CONNECTING);
// Change edsServicename to edsServiceName2.
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName2", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName2", null));
// The old balancer was not READY, so it will be shutdown immediately.
verify(childBalancer1).shutdown();
@ -343,7 +349,7 @@ public class EdsLoadBalancerTest {
assertLatestSubchannelPicker(subchannel2);
// Change edsServiceName to edsServiceName3.
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName3", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName3", null));
clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName3",
ImmutableList.of(
@ -369,7 +375,7 @@ public class EdsLoadBalancerTest {
assertLatestConnectivityState(CONNECTING);
// Change edsServiceName to edsServiceName4.
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName4", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName4", null));
verify(childBalancer3).shutdown();
clusterLoadAssignment =
@ -397,7 +403,7 @@ public class EdsLoadBalancerTest {
assertLatestSubchannelPicker(subchannel4);
// Change edsServiceName to edsServiceName5.
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName5", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName5", null));
clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName5",
ImmutableList.of(
@ -432,13 +438,13 @@ public class EdsLoadBalancerTest {
@Test
public void firstAndSecondEdsResponseReceived_onWorkingCalledOnce() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
verify(resourceUpdateCallback, never()).onWorking();
// first EDS response
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
@ -451,7 +457,7 @@ public class EdsLoadBalancerTest {
// second EDS response
clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
@ -466,10 +472,10 @@ public class EdsLoadBalancerTest {
@Test
public void handleAllDropUpdates_pickersAreDropped() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment(
"edsServiceName1",
CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
@ -496,7 +502,7 @@ public class EdsLoadBalancerTest {
assertLatestSubchannelPicker(subchannel);
clusterLoadAssignment = buildClusterLoadAssignment(
"edsServiceName1",
CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
@ -519,7 +525,7 @@ public class EdsLoadBalancerTest {
@Test
public void handleLocalityAssignmentUpdates_pickersUpdatedFromChildBalancer() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
LbEndpoint endpoint11 = buildLbEndpoint("addr11.example.com", 8011, HEALTHY, 11);
LbEndpoint endpoint12 = buildLbEndpoint("addr12.example.com", 8012, HEALTHY, 12);
@ -545,7 +551,7 @@ public class EdsLoadBalancerTest {
0);
ClusterLoadAssignment clusterLoadAssignment = buildClusterLoadAssignment(
"edsServiceName1",
CLUSTER_NAME,
ImmutableList.of(localityLbEndpoints1, localityLbEndpoints2, localityLbEndpoints3),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
@ -606,7 +612,7 @@ public class EdsLoadBalancerTest {
helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper,
channelFactory);
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName1", null));
assertThat(localityStores).hasSize(1);
LocalityStore localityStore = localityStores.peekLast();
@ -643,7 +649,7 @@ public class EdsLoadBalancerTest {
verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap());
// Change cluster name.
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName2", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, "edsServiceName2", null));
assertThat(localityStores).hasSize(2);
localityStore = localityStores.peekLast();
@ -666,7 +672,7 @@ public class EdsLoadBalancerTest {
@Test
public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
verify(resourceUpdateCallback, never()).onError();
// Forwarding 20 seconds so that the xds client will deem EDS resource not available.
@ -677,10 +683,10 @@ public class EdsLoadBalancerTest {
@Test
public void verifyErrorPropagation_withPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(new XdsConfig(null, null, "edsServiceName1", null));
deliverResolvedAddresses(new XdsConfig(CLUSTER_NAME, null, null, null, null));
// Endpoint update received.
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment("edsServiceName1",
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(

View File

@ -31,6 +31,8 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
@ -53,7 +55,6 @@ import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -61,6 +62,7 @@ import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -79,7 +81,17 @@ import org.mockito.MockitoAnnotations;
*/
@RunWith(JUnit4.class)
public class LoadReportClientTest {
private static final Node NODE = Node.newBuilder().setId("LRS test").build();
private static final String TARGET_NAME = "lrs-test.example.com";
// bootstrap node identifier
private static final Node NODE =
Node.newBuilder()
.setId("LRS test")
.setMetadata(
Struct.newBuilder()
.putFields(
"TRAFFICDIRECTOR_NETWORK_HOSTNAME",
Value.newBuilder().setStringValue("default").build()))
.build();
private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
@ -167,11 +179,14 @@ public class LoadReportClientTest {
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
lrsClient =
new LoadReportClient(
TARGET_NAME,
channel,
NODE, syncContext,
NODE,
syncContext,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
lrsClient.startLoadReporting(callback);
}
@After
@ -182,22 +197,17 @@ public class LoadReportClientTest {
@Test
public void typicalWorkflow() {
String cluster1 = "cluster-foo.googleapis.com";
String cluster2 = "cluster-bar.googleapis.com";
ClusterStats rawStats1 = generateClusterLoadStats(cluster1);
ClusterStats rawStats2 = generateClusterLoadStats(cluster2);
when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1);
when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2);
lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1);
lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2);
lrsClient.startLoadReporting(callback);
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
StreamObserver<LoadStatsRequest> requestObserver =
Iterables.getOnlyElement(lrsRequestObservers);
InOrder inOrder = inOrder(requestObserver, callback);
inOrder.verify(requestObserver).onNext(eq(buildInitialRequest(cluster1, cluster2)));
inOrder.verify(requestObserver).onNext(eq(buildInitialRequest()));
String cluster1 = "cluster-foo.googleapis.com";
ClusterStats rawStats1 = generateClusterLoadStats(cluster1, null);
when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1);
lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1);
// Management server asks to report loads for cluster1.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000));
@ -213,7 +223,13 @@ public class LoadReportClientTest {
fakeClock.forwardNanos(1000);
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
// Management server updates the interval of sending load reports.
String cluster2 = "cluster-bar.googleapis.com";
ClusterStats rawStats2 = generateClusterLoadStats(cluster2, null);
when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2);
lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2);
// Management server updates the interval of sending load reports, while still asking for
// loads to cluster1 only.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000));
inOrder.verify(callback).onReportResponse(2000);
@ -226,7 +242,6 @@ public class LoadReportClientTest {
// Management server asks to report loads for cluster1 and cluster2.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000));
inOrder.verify(callback).onReportResponse(2000);
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
@ -236,7 +251,6 @@ public class LoadReportClientTest {
// Load reports for cluster1 is no longer wanted.
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000));
inOrder.verify(callback).onReportResponse(2000);
fakeClock.forwardNanos(2000);
inOrder.verify(requestObserver)
@ -245,7 +259,6 @@ public class LoadReportClientTest {
// Management server asks loads for a cluster that client has no load data.
responseObserver
.onNext(buildLrsResponse(ImmutableList.of("cluster-unknown.googleapis.com"), 2000));
inOrder.verify(callback).onReportResponse(2000);
fakeClock.forwardNanos(2000);
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
@ -257,12 +270,6 @@ public class LoadReportClientTest {
@Test
public void lrsStreamClosedAndRetried() {
String clusterName = "cluster-foo.googleapis.com";
ClusterStats stats = generateClusterLoadStats(clusterName);
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
lrsClient.startLoadReporting(callback);
InOrder inOrder = inOrder(mockLoadReportingService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
@ -270,8 +277,14 @@ public class LoadReportClientTest {
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
String clusterName = "cluster-foo.googleapis.com";
String clusterServiceName = "service-blade.googleapis.com";
ClusterStats stats = generateClusterLoadStats(clusterName, clusterServiceName);
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
// First balancer RPC
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer closes it immediately (erroneously)
@ -291,7 +304,7 @@ public class LoadReportClientTest {
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer closes it with an error.
@ -310,7 +323,7 @@ public class LoadReportClientTest {
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer sends a response asking for loads of the cluster.
@ -326,7 +339,7 @@ public class LoadReportClientTest {
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
// Fail the retry after spending 4ns
fakeClock.forwardNanos(4);
@ -344,7 +357,7 @@ public class LoadReportClientTest {
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Load reporting back to normal.
@ -363,19 +376,19 @@ public class LoadReportClientTest {
@Test
public void raceBetweenLoadReportingAndLbStreamClosure() {
String clusterName = "cluster-foo.googleapis.com";
ClusterStats stats = generateClusterLoadStats(clusterName);
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
lrsClient.startLoadReporting(callback);
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
String clusterName = "cluster-foo.googleapis.com";
String clusterServiceName = "service-blade.googleapis.com";
ClusterStats stats = generateClusterLoadStats(clusterName, clusterServiceName);
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
// First balancer RPC
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
verify(requestObserver).onNext(eq(buildInitialRequest()));
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Simulate receiving a response from traffic director.
@ -412,19 +425,28 @@ public class LoadReportClientTest {
.build();
}
private static LoadStatsRequest buildInitialRequest(String... clusters) {
List<ClusterStats> clusterStatsList = new ArrayList<>();
for (String cluster : clusters) {
clusterStatsList.add(ClusterStats.newBuilder().setClusterName(cluster).build());
}
private static LoadStatsRequest buildInitialRequest() {
return
LoadStatsRequest.newBuilder().setNode(NODE).addAllClusterStats(clusterStatsList).build();
LoadStatsRequest.newBuilder()
.setNode(
Node.newBuilder()
.setId("LRS test")
.setMetadata(
Struct.newBuilder()
.putFields(
"TRAFFICDIRECTOR_NETWORK_HOSTNAME",
Value.newBuilder().setStringValue("default").build())
.putFields(
LoadReportClient.TARGET_NAME_METADATA_KEY,
Value.newBuilder().setStringValue(TARGET_NAME).build())))
.build();
}
/**
* Generates a raw service load stats report with random data.
*/
private static ClusterStats generateClusterLoadStats(String clusterName) {
private static ClusterStats generateClusterLoadStats(
String clusterName, @Nullable String clusterServiceName) {
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
@ -432,30 +454,32 @@ public class LoadReportClientTest {
long numLbDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
return
ClusterStats.newBuilder()
.setClusterName(clusterName)
.addUpstreamLocalityStats(
UpstreamLocalityStats.newBuilder()
.setLocality(
Locality.newBuilder()
.setRegion("region-foo")
.setZone("zone-bar")
.setSubZone("subzone-baz"))
.setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)
.setTotalIssuedRequests(callsIssued))
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("lb")
.setDroppedCount(numLbDrops))
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("throttle")
.setDroppedCount(numThrottleDrops))
.setTotalDroppedRequests(numLbDrops + numThrottleDrops)
.build();
ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder();
clusterStatsBuilder.setClusterName(clusterName);
if (clusterServiceName != null) {
clusterStatsBuilder.setClusterServiceName(clusterServiceName);
}
clusterStatsBuilder.addUpstreamLocalityStats(
UpstreamLocalityStats.newBuilder()
.setLocality(
Locality.newBuilder()
.setRegion("region-foo")
.setZone("zone-bar")
.setSubZone("subzone-baz"))
.setTotalRequestsInProgress(callsInProgress)
.setTotalSuccessfulRequests(callsSucceeded)
.setTotalErrorRequests(callsFailed)
.setTotalIssuedRequests(callsIssued))
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("lb")
.setDroppedCount(numLbDrops))
.addDroppedRequests(
DroppedRequests.newBuilder()
.setCategory("throttle")
.setDroppedCount(numThrottleDrops))
.setTotalDroppedRequests(numLbDrops + numThrottleDrops);
return clusterStatsBuilder.build();
}
/**
@ -476,6 +500,11 @@ public class LoadReportClientTest {
@Override
public boolean matches(LoadStatsRequest argument) {
if (!argument.getNode().getMetadata()
.getFieldsOrThrow(LoadReportClient.TARGET_NAME_METADATA_KEY)
.getStringValue().equals(TARGET_NAME)) {
return false;
}
if (argument.getClusterStatsCount() != expectedStats.size()) {
return false;
}

View File

@ -45,6 +45,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
@ -122,6 +123,7 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class XdsClientImplTest {
private static final String TARGET_NAME = "foo.googleapis.com:8080";
private static final String HOSTNAME = "foo.googleapis.com";
private static final int PORT = 8080;
@ -250,7 +252,6 @@ public class XdsClientImplTest {
new CancellationListener() {
@Override
public void cancelled(Context context) {
call.cancelled = true;
lrsEnded.set(true);
}
}, MoreExecutors.directExecutor());
@ -282,8 +283,14 @@ public class XdsClientImplTest {
};
xdsClient =
new XdsClientImpl(servers, channelFactory, NODE, syncContext,
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
new XdsClientImpl(
TARGET_NAME,
servers,
channelFactory,
NODE,
syncContext,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
fakeClock.getStopwatchSupplier());
// Only the connection to management server is established, no RPC request is sent until at
// least one watcher is registered.
@ -1266,10 +1273,9 @@ public class XdsClientImplTest {
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate.getLrsServerName()).isNull();
// Management server sends back another CDS response updating the requested Cluster.
clusters = ImmutableList.of(
@ -1292,8 +1298,7 @@ public class XdsClientImplTest {
assertThat(clusterUpdate.getEdsServiceName())
.isEqualTo("eds-cluster-foo.googleapis.com");
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.isEnableLrs()).isTrue();
assertThat(clusterUpdate.getLrsServerName()).isEmpty();
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
}
/**
@ -1371,20 +1376,18 @@ public class XdsClientImplTest {
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate2.getLrsServerName()).isNull();
verify(watcher3, never()).onClusterChanged(any(ClusterUpdate.class));
verify(watcher3, never()).onError(any(Status.class));
@ -1421,20 +1424,18 @@ public class XdsClientImplTest {
clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate1.getLrsServerName()).isNull();
clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2, times(2)).onClusterChanged(clusterUpdateCaptor2.capture());
clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate2.getLrsServerName()).isNull();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor3 = ArgumentCaptor.forClass(null);
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
@ -1443,8 +1444,7 @@ public class XdsClientImplTest {
assertThat(clusterUpdate3.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate3.getLrsServerName()).isEmpty();
assertThat(clusterUpdate3.getLrsServerName()).isEqualTo("");
}
/**
@ -1484,10 +1484,9 @@ public class XdsClientImplTest {
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate1.getLrsServerName()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// Another cluster watcher interested in the same cluster is added.
@ -1500,10 +1499,9 @@ public class XdsClientImplTest {
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
ClusterUpdate clusterUpdate2 = clusterUpdateCaptor2.getValue();
assertThat(clusterUpdate2.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate2.getEdsServiceName()).isNull();
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate2.getLrsServerName()).isNull();
verifyNoMoreInteractions(requestObserver);
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
@ -1543,10 +1541,9 @@ public class XdsClientImplTest {
verify(watcher1).onClusterChanged(clusterUpdateCaptor1.capture());
ClusterUpdate clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate1.getLrsServerName()).isNull();
// Add another cluster watcher for a different cluster.
ClusterWatcher watcher2 = mock(ClusterWatcher.class);
@ -1581,10 +1578,9 @@ public class XdsClientImplTest {
verify(watcher1, times(2)).onClusterChanged(clusterUpdateCaptor1.capture());
clusterUpdate1 = clusterUpdateCaptor1.getValue();
assertThat(clusterUpdate1.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate1.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate1.getEdsServiceName()).isNull();
assertThat(clusterUpdate1.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate1.isEnableLrs()).isEqualTo(false);
assertThat(clusterUpdate1.getLrsServerName()).isNull();
ArgumentCaptor<ClusterUpdate> clusterUpdateCaptor2 = ArgumentCaptor.forClass(null);
verify(watcher2).onClusterChanged(clusterUpdateCaptor2.capture());
@ -1593,8 +1589,7 @@ public class XdsClientImplTest {
assertThat(clusterUpdate2.getEdsServiceName())
.isEqualTo("eds-cluster-bar.googleapis.com");
assertThat(clusterUpdate2.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate2.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate2.getLrsServerName()).isEmpty();
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
// Cancel one of the watcher.
xdsClient.cancelClusterDataWatch("cluster-foo.googleapis.com", watcher1);
@ -1660,11 +1655,9 @@ public class XdsClientImplTest {
verify(watcher3).onClusterChanged(clusterUpdateCaptor3.capture());
ClusterUpdate clusterUpdate3 = clusterUpdateCaptor3.getValue();
assertThat(clusterUpdate3.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate3.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate3.getEdsServiceName()).isNull();
assertThat(clusterUpdate3.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate3.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate2.getLrsServerName()).isEmpty();
assertThat(clusterUpdate2.getLrsServerName()).isEqualTo("");
verifyNoMoreInteractions(watcher1, watcher2);
@ -1768,11 +1761,9 @@ public class XdsClientImplTest {
verify(clusterWatcher).onClusterChanged(clusterUpdateCaptor.capture());
ClusterUpdate clusterUpdate = clusterUpdateCaptor.getValue();
assertThat(clusterUpdate.getClusterName()).isEqualTo("cluster-foo.googleapis.com");
assertThat(clusterUpdate.getEdsServiceName())
.isEqualTo("cluster-foo.googleapis.com"); // default to cluster name
assertThat(clusterUpdate.getEdsServiceName()).isNull();
assertThat(clusterUpdate.getLbPolicy()).isEqualTo("round_robin");
assertThat(clusterUpdate.isEnableLrs()).isEqualTo(true);
assertThat(clusterUpdate.getLrsServerName()).isEmpty();
assertThat(clusterUpdate.getLrsServerName()).isEqualTo("");
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
// No cluster is available.
@ -3097,14 +3088,31 @@ public class XdsClientImplTest {
*/
@Test
public void reportLoadStatsToServer() {
LoadStatsStore loadStatsStore = mock(LoadStatsStore.class);
String clusterName = "cluster-foo.googleapis.com";
LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(clusterName, null);
ArgumentCaptor<LoadStatsRequest> requestCaptor = ArgumentCaptor.forClass(null);
xdsClient.reportClientStats(clusterName, null, loadStatsStore);
LoadReportCall lrsCall = loadReportCalls.poll();
verify(lrsCall.requestObserver).onNext(eq(buildInitialLoadStatsRequest(clusterName)));
verify(lrsCall.requestObserver).onNext(requestCaptor.capture());
assertThat(requestCaptor.getValue().getClusterStatsCount())
.isEqualTo(0); // initial request
lrsCall.responseObserver.onNext(
LoadStatsResponse.newBuilder()
.addClusters(clusterName)
.setLoadReportingInterval(Durations.fromNanos(1000L))
.build());
fakeClock.forwardNanos(1000L);
verify(lrsCall.requestObserver, times(2)).onNext(requestCaptor.capture());
ClusterStats report = Iterables.getOnlyElement(requestCaptor.getValue().getClusterStatsList());
assertThat(report.getClusterName()).isEqualTo(clusterName);
xdsClient.cancelClientStatsReport(clusterName, null);
assertThat(lrsCall.cancelled).isTrue();
fakeClock.forwardNanos(1000L);
verify(lrsCall.requestObserver, times(3)).onNext(requestCaptor.capture());
assertThat(requestCaptor.getValue().getClusterStatsCount())
.isEqualTo(0); // no more stats reported
// See more test on LoadReportClientTest.java
}
@ -3536,14 +3544,6 @@ public class XdsClientImplTest {
assertThat(res).isEqualTo(expectedString);
}
private static LoadStatsRequest buildInitialLoadStatsRequest(String clusterName) {
return
LoadStatsRequest.newBuilder()
.setNode(NODE)
.addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName))
.build();
}
/**
* Matcher for DiscoveryRequest without the comparison of error_details field, which is used for
* management server debugging purposes.
@ -3593,7 +3593,6 @@ public class XdsClientImplTest {
private final StreamObserver<LoadStatsRequest> requestObserver;
@SuppressWarnings("unused")
private final StreamObserver<LoadStatsResponse> responseObserver;
private boolean cancelled;
LoadReportCall(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {

View File

@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import io.grpc.xds.XdsClient.ClusterUpdate;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsClientFactory;
import org.junit.Rule;
@ -38,24 +37,6 @@ public class XdsClientTest {
@Rule
public final ExpectedException thrown = ExpectedException.none();
@Test
public void buildClusterUpdate_defaultToClusterNameWhenEdsServiceNameNotSet() {
ClusterUpdate clusterUpdate1 =
ClusterUpdate.newBuilder()
.setClusterName("foo.googleapis.com")
.setEdsServiceName("bar.googleapis.com")
.setLbPolicy("round_robin")
.build();
assertThat(clusterUpdate1.getEdsServiceName()).isEqualTo("bar.googleapis.com");
ClusterUpdate clusterUpdate2 =
ClusterUpdate.newBuilder()
.setClusterName("foo.googleapis.com")
.setLbPolicy("round_robin")
.build();
assertThat(clusterUpdate2.getEdsServiceName()).isEqualTo("foo.googleapis.com");
}
@Test
public void refCountedXdsClientObjectPool_getObjectShouldMatchReturnObject() {
XdsClientFactory xdsClientFactory = new XdsClientFactory() {

View File

@ -155,6 +155,7 @@ public class XdsLoadBalancerProviderTest {
@Test
public void parseLoadBalancingConfigPolicy() throws Exception {
String rawLbConfig = "{"
+ "\"cluster\" : \"foo.googleapis.com\","
+ "\"childPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"supported_1\" : {}}],"
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"round_robin\" : {\"key\" : \"val\"}},"
+ "{\"supported_2\" : {\"key\" : \"val\"}}],"
@ -169,6 +170,7 @@ public class XdsLoadBalancerProviderTest {
assertThat(configOrError.getConfig()).isInstanceOf(XdsConfig.class);
assertThat(configOrError.getConfig()).isEqualTo(
new XdsConfig(
"foo.googleapis.com",
ServiceConfigUtil.unwrapLoadBalancingConfig(
checkObject(JsonParser.parse("{\"supported_1\" : {}}"))),
ServiceConfigUtil.unwrapLoadBalancingConfig(