xds: support ring_hash as the endpoint-level LB policy (#7991)

Update LB policy config generation to support ring hash policy as the endpoint-level LB policy.

- Changed the CDS LB policy to accept RING_HASH as the endpoint LB policy from CDS updates. This configuration is directly passed to its child policy (aka, ClusterResolverLoadBalancer) in its config.

- Changed ClusterResolverLoadBalancer to generate different LB configs for its downstream LB policies, depending on the endpoint-level LB policies.
  - If the endpoint-level LB policy is ROUND_ROBIN, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> WeightedTargetLB -> RoundRobinLB
  - If the endpoin-level LB policy is RNIG_HASH, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> RingHashLB.
This commit is contained in:
Chengyuan Zhang 2021-04-16 12:46:55 -07:00 committed by GitHub
parent 31cfb6d32e
commit b4fe07d22d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 421 additions and 257 deletions

View File

@ -32,9 +32,11 @@ import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -181,15 +183,16 @@ final class CdsLoadBalancer2 extends LoadBalancer {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return; return;
} }
String endpointPickingPolicy = root.result.lbPolicy(); LoadBalancerProvider lbProvider = null;
LoadBalancerProvider localityPickingLbProvider = Object lbConfig = null;
lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded if (root.result.lbPolicy() == LbPolicy.RING_HASH) {
LoadBalancerProvider endpointPickingLbProvider = lbProvider = lbRegistry.getProvider("ring_hash");
lbRegistry.getProvider(endpointPickingPolicy); lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize());
} else {
lbProvider = lbRegistry.getProvider("round_robin");
}
ClusterResolverConfig config = new ClusterResolverConfig( ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances), Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig));
new PolicySelection(localityPickingLbProvider, null /* by cluster_resolver LB policy */),
new PolicySelection(endpointPickingLbProvider, null));
if (childLb == null) { if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
} }

View File

@ -22,7 +22,6 @@ import static io.grpc.xds.EnvoyServerProtoData.TRANSPORT_SOCKET_NAME_TLS;
import com.github.udpa.udpa.type.v1.TypedStruct; import com.github.udpa.udpa.type.v1.TypedStruct;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@ -832,8 +831,6 @@ final class ClientXdsClient extends AbstractXdsClient {
} }
CdsUpdate.Builder updateBuilder = structOrError.getStruct(); CdsUpdate.Builder updateBuilder = structOrError.getStruct();
String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name());
if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
@ -841,10 +838,10 @@ final class ClientXdsClient extends AbstractXdsClient {
throw new ResourceInvalidException( throw new ResourceInvalidException(
"Unsupported ring hash function: " + lbConfig.getHashFunction()); "Unsupported ring hash function: " + lbConfig.getHashFunction());
} }
updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(), updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH,
lbConfig.getMaximumRingSize().getValue()); lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue());
} else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) { } else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
updateBuilder.lbPolicy(lbPolicy); updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN);
} else { } else {
throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy()); throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy());
} }

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -76,7 +77,8 @@ import javax.annotation.Nullable;
* used in the downstream LB policies for fine-grained load balancing purposes. * used in the downstream LB policies for fine-grained load balancing purposes.
*/ */
final class ClusterResolverLoadBalancer extends LoadBalancer { final class ClusterResolverLoadBalancer extends LoadBalancer {
// DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode
// to an empty locality.
private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", "");
private final XdsLogger logger; private final XdsLogger logger;
private final String authority; private final String authority;
@ -156,12 +158,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
private final Helper helper; private final Helper helper;
private final List<String> clusters = new ArrayList<>(); private final List<String> clusters = new ArrayList<>();
private final Map<String, ClusterState> clusterStates = new HashMap<>(); private final Map<String, ClusterState> clusterStates = new HashMap<>();
// An aggregate cluster is thought of as a cluster that groups the endpoints of the underlying private PolicySelection endpointLbPolicy;
// clusters together for load balancing purposes only. Load balancing policies (both locality
// level and endpoint level) are configured by the aggregate cluster and apply to all of its
// underlying clusters.
private PolicySelection localityPickingPolicy;
private PolicySelection endpointPickingPolicy;
private ResolvedAddresses resolvedAddresses; private ResolvedAddresses resolvedAddresses;
private LoadBalancer childLb; private LoadBalancer childLb;
@ -175,20 +172,18 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
this.resolvedAddresses = resolvedAddresses; this.resolvedAddresses = resolvedAddresses;
ClusterResolverConfig config = ClusterResolverConfig config =
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
localityPickingPolicy = config.localityPickingPolicy; endpointLbPolicy = config.lbPolicy;
endpointPickingPolicy = config.endpointPickingPolicy;
for (DiscoveryMechanism instance : config.discoveryMechanisms) { for (DiscoveryMechanism instance : config.discoveryMechanisms) {
clusters.add(instance.cluster); clusters.add(instance.cluster);
ClusterState state; ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) { if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName, state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext); instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext);
clusterStates.put(instance.cluster, state);
} else { // logical DNS } else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.lrsServerName, state = new LogicalDnsClusterState(instance.cluster, instance.lrsServerName,
instance.maxConcurrentRequests, instance.tlsContext); instance.maxConcurrentRequests, instance.tlsContext);
clusterStates.put(instance.cluster, state);
} }
clusterStates.put(instance.cluster, state);
state.start(); state.start();
} }
} }
@ -392,8 +387,11 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
for (LbEndpoint endpoint : localityLbInfo.endpoints()) { for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
if (endpoint.isHealthy()) { if (endpoint.isHealthy()) {
discard = false; discard = false;
long weight =
(long) localityLbInfo.localityWeight() * endpoint.loadBalancingWeight();
Attributes attr = endpoint.eag().getAttributes().toBuilder() Attributes attr = endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality).build(); .set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight).build();
EquivalentAddressGroup eag = EquivalentAddressGroup eag =
new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr); new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr);
eag = AddressFilter.setPathFilter( eag = AddressFilter.setPathFilter(
@ -419,10 +417,10 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
} }
List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet()); List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet());
Collections.sort(priorities); Collections.sort(priorities);
Map<String, PriorityChildConfig> priorityChildConfigs = generatePriorityChildConfigs( Map<String, PriorityChildConfig> priorityChildConfigs =
generateEdsBasedPriorityChildConfigs(
name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext, name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
localityPickingPolicy, endpointPickingPolicy, true, lbRegistry, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
prioritizedLocalityWeights, dropOverloads);
status = Status.OK; status = Status.OK;
resolved = true; resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities); result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities);
@ -532,9 +530,12 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
return; return;
} }
backoffPolicy = null; // reset backoff sequence if succeeded backoffPolicy = null; // reset backoff sequence if succeeded
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>(); List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
Attributes attr = eag.getAttributes().toBuilder().set( Attributes attr = eag.getAttributes().toBuilder().set(
InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build(); InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr); eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
@ -542,12 +543,9 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString())); eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString()));
addresses.add(eag); addresses.add(eag);
} }
PolicySelection endpointPickingPolicy = PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
PriorityChildConfig priorityChildConfig = generatePriorityChildConfig(
name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext, name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext,
endpointPickingPolicy, false, lbRegistry, lbRegistry, Collections.<DropOverload>emptyList());
Collections.<DropOverload>emptyList());
status = Status.OK; status = Status.OK;
resolved = true; resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
@ -614,58 +612,74 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
} }
/** /**
* Generates the config to be used in the priority LB policy for a single priority. * Generates the config to be used in the priority LB policy for the single priority of
* logical DNS cluster.
* *
* <p>priority LB -> cluster_impl LB -> pick_first * <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
*/ */
private static PriorityChildConfig generatePriorityChildConfig( private static PriorityChildConfig generateDnsBasedPriorityChildConfig(
String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName, String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
PolicySelection endpointPickingPolicy, boolean ignoreReresolution,
LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) { LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) {
// Override endpoint-level LB policy with pick_first for logical DNS cluster.
PolicySelection endpointLbPolicy =
new PolicySelection(lbRegistry.getProvider("pick_first"), null);
ClusterImplConfig clusterImplConfig = ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests, new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
dropOverloads, endpointPickingPolicy, tlsContext); dropOverloads, endpointLbPolicy, tlsContext);
LoadBalancerProvider clusterImplLbProvider = LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy = PolicySelection clusterImplPolicy =
new PolicySelection(clusterImplLbProvider, clusterImplConfig); new PolicySelection(clusterImplLbProvider, clusterImplConfig);
return new PriorityChildConfig(clusterImplPolicy, ignoreReresolution); return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
} }
/** /**
* Generates configs to be used in the priority LB policy for priorities in the cluster. * Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
* *
* <p>priority LB -> cluster_impl LB (one per priority) -> weighted_target LB * <p>priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB
* -> round_robin (one per locality)) * -> round_robin (one per locality)) / ring_hash
*/ */
private static Map<String, PriorityChildConfig> generatePriorityChildConfigs( private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs(
String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName, String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext,
PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy, PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry,
boolean ignoreReresolution, LoadBalancerRegistry lbRegistry,
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights, Map<String, Map<Locality, Integer>> prioritizedLocalityWeights,
List<DropOverload> dropOverloads) { List<DropOverload> dropOverloads) {
Map<String, PriorityChildConfig> configs = new HashMap<>(); Map<String, PriorityChildConfig> configs = new HashMap<>();
for (String priority : prioritizedLocalityWeights.keySet()) { for (String priority : prioritizedLocalityWeights.keySet()) {
PolicySelection leafPolicy = endpointLbPolicy;
// Depending on the endpoint-level load balancing policy, different LB hierarchy may be
// created. If the endpoint-level LB policy is round_robin, it creates a two-level LB
// hierarchy: a locality-level LB policy that balances load according to locality weights
// followed by an endpoint-level LB policy that simply rounds robin the endpoints within
// the locality. If the endpoint-level LB policy is ring_hash, it creates a unified LB
// policy that balances load by weighing the product of each endpoint's weight and the
// weight of the locality it belongs to.
if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")) {
Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority); Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
Map<String, WeightedPolicySelection> targets = new HashMap<>(); Map<String, WeightedPolicySelection> targets = new HashMap<>();
for (Locality locality : localityWeights.keySet()) { for (Locality locality : localityWeights.keySet()) {
int weight = localityWeights.get(locality); int weight = localityWeights.get(locality);
targets.put(localityName(locality), WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy);
new WeightedPolicySelection(weight, endpointPickingPolicy)); targets.put(localityName(locality), target);
}
LoadBalancerProvider weightedTargetLbProvider =
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME);
WeightedTargetConfig weightedTargetConfig =
new WeightedTargetConfig(Collections.unmodifiableMap(targets));
leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig);
} }
PolicySelection localityPicking = new PolicySelection(
localityPickingPolicy.getProvider(),
new WeightedTargetConfig(Collections.unmodifiableMap(targets)));
ClusterImplConfig clusterImplConfig = ClusterImplConfig clusterImplConfig =
new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests, new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests,
dropOverloads, localityPicking, tlsContext); dropOverloads, leafPolicy, tlsContext);
LoadBalancerProvider clusterImplLbProvider = LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
PolicySelection clusterImplPolicy = PolicySelection clusterImplPolicy =
new PolicySelection(clusterImplLbProvider, clusterImplConfig); new PolicySelection(clusterImplLbProvider, clusterImplConfig);
configs.put(priority, new PriorityChildConfig(clusterImplPolicy, ignoreReresolution)); PriorityChildConfig priorityChildConfig =
new PriorityChildConfig(clusterImplPolicy, true /* ignoreReresolution */);
configs.put(priority, priorityChildConfig);
} }
return configs; return configs;
} }

View File

@ -67,19 +67,17 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
static final class ClusterResolverConfig { static final class ClusterResolverConfig {
// Ordered list of clusters to be resolved. // Ordered list of clusters to be resolved.
final List<DiscoveryMechanism> discoveryMechanisms; final List<DiscoveryMechanism> discoveryMechanisms;
final PolicySelection localityPickingPolicy; // Endpoint-level load balancing policy with config (round_robin or ring_hash).
final PolicySelection endpointPickingPolicy; final PolicySelection lbPolicy;
ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, ClusterResolverConfig(List<DiscoveryMechanism> discoveryMechanisms, PolicySelection lbPolicy) {
PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) {
this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms"); this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms");
this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy"); this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy");
this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy");
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(discoveryMechanisms, localityPickingPolicy, endpointPickingPolicy); return Objects.hash(discoveryMechanisms, lbPolicy);
} }
@Override @Override
@ -92,16 +90,14 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi
} }
ClusterResolverConfig that = (ClusterResolverConfig) o; ClusterResolverConfig that = (ClusterResolverConfig) o;
return discoveryMechanisms.equals(that.discoveryMechanisms) return discoveryMechanisms.equals(that.discoveryMechanisms)
&& localityPickingPolicy.equals(that.localityPickingPolicy) && lbPolicy.equals(that.lbPolicy);
&& endpointPickingPolicy.equals(that.endpointPickingPolicy);
} }
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("discoveryMechanisms", discoveryMechanisms) .add("discoveryMechanisms", discoveryMechanisms)
.add("localityPickingPolicy", localityPickingPolicy) .add("lbPolicy", lbPolicy)
.add("endpointPickingPolicy", endpointPickingPolicy)
.toString(); .toString();
} }

View File

@ -178,7 +178,7 @@ abstract class XdsClient {
abstract ClusterType clusterType(); abstract ClusterType clusterType();
// Endpoint-level load balancing policy. // Endpoint-level load balancing policy.
abstract String lbPolicy(); abstract LbPolicy lbPolicy();
// Only valid if lbPolicy is "ring_hash". // Only valid if lbPolicy is "ring_hash".
abstract long minRingSize(); abstract long minRingSize();
@ -251,6 +251,10 @@ abstract class XdsClient {
EDS, LOGICAL_DNS, AGGREGATE EDS, LOGICAL_DNS, AGGREGATE
} }
enum LbPolicy {
ROUND_ROBIN, RING_HASH
}
// FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed. // FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed.
@Override @Override
public final String toString() { public final String toString() {
@ -270,38 +274,37 @@ abstract class XdsClient {
@AutoValue.Builder @AutoValue.Builder
abstract static class Builder { abstract static class Builder {
// Private do not use. // Private, use one of the static factory methods instead.
protected abstract Builder clusterName(String clusterName); protected abstract Builder clusterName(String clusterName);
// Private do not use. // Private, use one of the static factory methods instead.
protected abstract Builder clusterType(ClusterType clusterType); protected abstract Builder clusterType(ClusterType clusterType);
// Private do not use. abstract Builder lbPolicy(LbPolicy lbPolicy);
protected abstract Builder lbPolicy(String lbPolicy);
Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize) { Builder lbPolicy(LbPolicy lbPolicy, long minRingSize, long maxRingSize) {
return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize); return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize);
} }
// Private do not use. // Private, use lbPolicy(LbPolicy, long, long).
protected abstract Builder minRingSize(long minRingSize); protected abstract Builder minRingSize(long minRingSize);
// Private do not use. // Private, use lbPolicy(.LbPolicy, long, long)
protected abstract Builder maxRingSize(long maxRingSize); protected abstract Builder maxRingSize(long maxRingSize);
// Private do not use. // Private, use CdsUpdate.forEds() instead.
protected abstract Builder edsServiceName(String edsServiceName); protected abstract Builder edsServiceName(String edsServiceName);
// Private do not use. // Private, use one of the static factory methods instead.
protected abstract Builder lrsServerName(String lrsServerName); protected abstract Builder lrsServerName(String lrsServerName);
// Private do not use. // Private, use one of the static factory methods instead.
protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests); protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
// Private do not use. // Private, use one of the static factory methods instead.
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext); protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
// Private do not use. // Private, use CdsUpdate.forAggregate() instead.
protected abstract Builder prioritizedClusterNames(List<String> prioritizedClusterNames); protected abstract Builder prioritizedClusterNames(List<String> prioritizedClusterNames);
abstract CdsUpdate build(); abstract CdsUpdate build();

View File

@ -18,7 +18,6 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -47,6 +46,9 @@ import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -118,8 +120,8 @@ public class CdsLoadBalancer2Test {
when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getSynchronizationContext()).thenReturn(syncContext);
lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider("round_robin")); lbRegistry.register(new FakeLoadBalancerProvider("round_robin"));
lbRegistry.register(new FakeLoadBalancerProvider("ring_hash"));
loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); loadBalancer = new CdsLoadBalancer2(helper, lbRegistry);
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder() ResolvedAddresses.newBuilder()
@ -144,8 +146,10 @@ public class CdsLoadBalancer2Test {
@Test @Test
public void discoverTopLevelEdsCluster() { public void discoverTopLevelEdsCluster() {
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, CdsUpdate update =
upstreamTlsContext); CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1); assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@ -154,15 +158,15 @@ public class CdsLoadBalancer2Test {
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
LRS_SERVER_NAME, 100L, upstreamTlsContext); LRS_SERVER_NAME, 100L, upstreamTlsContext);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
} }
@Test @Test
public void discoverTopLevelLogicalDnsCluster() { public void discoverTopLevelLogicalDnsCluster() {
xdsClient.deliverLogicalDnsCluster(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext); CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1); assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@ -171,10 +175,7 @@ public class CdsLoadBalancer2Test {
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null,
LRS_SERVER_NAME, 100L, upstreamTlsContext); LRS_SERVER_NAME, 100L, upstreamTlsContext);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin");
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
} }
@Test @Test
@ -189,7 +190,10 @@ public class CdsLoadBalancer2Test {
@Test @Test
public void nonAggregateCluster_resourceUpdate() { public void nonAggregateCluster_resourceUpdate() {
xdsClient.deliverEdsCluster(CLUSTER, null, null, 100L, upstreamTlsContext); CdsUpdate update =
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1); assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
@ -197,7 +201,9 @@ public class CdsLoadBalancer2Test {
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, 100L, assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, 100L,
upstreamTlsContext); upstreamTlsContext);
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null); update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
childLbConfig = (ClusterResolverConfig) childBalancer.config; childLbConfig = (ClusterResolverConfig) childBalancer.config;
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
@ -206,7 +212,10 @@ public class CdsLoadBalancer2Test {
@Test @Test
public void nonAggregateCluster_resourceRevoked() { public void nonAggregateCluster_resourceRevoked() {
xdsClient.deliverLogicalDnsCluster(CLUSTER, null, 100L, upstreamTlsContext); CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1); assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
@ -225,27 +234,40 @@ public class CdsLoadBalancer2Test {
} }
@Test @Test
public void discoveryAggregateCluster() { public void discoverAggregateCluster() {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com"; String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)] // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.RING_HASH, 100L, 1000L).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
String cluster3 = "cluster-03.googleapis.com"; String cluster3 = "cluster-03.googleapis.com";
String cluster4 = "cluster-04.googleapis.com"; String cluster4 = "cluster-04.googleapis.com";
// cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)]
xdsClient.deliverAggregateCluster(cluster1, Arrays.asList(cluster3, cluster4)); CdsUpdate update1 =
CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster1, update1);
assertThat(xdsClient.watchers.keySet()).containsExactly( assertThat(xdsClient.watchers.keySet()).containsExactly(
CLUSTER, cluster1, cluster2, cluster3, cluster4); CLUSTER, cluster1, cluster2, cluster3, cluster4);
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, CdsUpdate update3 =
upstreamTlsContext); CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster3, update3);
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
xdsClient.deliverLogicalDnsCluster(cluster2, null, 100L, null); CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, null, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
xdsClient.deliverEdsCluster(cluster4, null, LRS_SERVER_NAME, 300L, CdsUpdate update4 =
null); CdsUpdate.forEds(cluster4, null, LRS_SERVER_NAME, 300L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster4, update4);
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
@ -258,17 +280,20 @@ public class CdsLoadBalancer2Test {
DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext); DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4, assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4,
DiscoveryMechanism.Type.EDS, null, LRS_SERVER_NAME, 300L, null); DiscoveryMechanism.Type.EDS, null, LRS_SERVER_NAME, 300L, null);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName())
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME); .isEqualTo("ring_hash"); // dominated by top-level cluster's config
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName()) assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).minRingSize).isEqualTo(100L);
.isEqualTo("round_robin"); assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).maxRingSize).isEqualTo(1000L);
} }
@Test @Test
public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() { public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS)] // CLUSTER (aggr.) -> [cluster1 (EDS)]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
xdsClient.deliverResourceNotExist(cluster1); xdsClient.deliverResourceNotExist(cluster1);
verify(helper).updateBalancingState( verify(helper).updateBalancingState(
@ -283,11 +308,19 @@ public class CdsLoadBalancer2Test {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com"; String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null); CdsUpdate update1 =
xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
upstreamTlsContext); .lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(2); assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
@ -321,11 +354,19 @@ public class CdsLoadBalancer2Test {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com"; String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null); CdsUpdate update1 =
xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
upstreamTlsContext); .lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(2); assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
@ -349,20 +390,31 @@ public class CdsLoadBalancer2Test {
public void aggregateCluster_intermediateClusterChanges() { public void aggregateCluster_intermediateClusterChanges() {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1] // CLUSTER (aggr.) -> [cluster1]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
// CLUSTER (aggr.) -> [cluster2 (aggr.)] // CLUSTER (aggr.) -> [cluster2 (aggr.)]
String cluster2 = "cluster-02.googleapis.com"; String cluster2 = "cluster-02.googleapis.com";
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster2)); update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2);
// cluster2 (aggr.) -> [cluster3 (EDS)] // cluster2 (aggr.) -> [cluster3 (EDS)]
String cluster3 = "cluster-03.googleapis.com"; String cluster3 = "cluster-03.googleapis.com";
xdsClient.deliverAggregateCluster(cluster2, Collections.singletonList(cluster3)); CdsUpdate update2 =
CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, CdsUpdate update3 =
upstreamTlsContext); CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster3, update3);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1); assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
@ -386,7 +438,10 @@ public class CdsLoadBalancer2Test {
public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() { public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1] // CLUSTER (aggr.) -> [cluster1]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
xdsClient.deliverError(error); xdsClient.deliverError(error);
@ -400,8 +455,14 @@ public class CdsLoadBalancer2Test {
public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() { public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() {
String cluster1 = "cluster-01.googleapis.com"; String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (logical DNS)] // CLUSTER (aggr.) -> [cluster1 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); CdsUpdate update =
xdsClient.deliverLogicalDnsCluster(cluster1, LRS_SERVER_NAME, 200L, null); CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
CdsUpdate update1 =
CdsUpdate.forLogicalDns(cluster1, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(cluster1, update1);
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config; ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1); assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
@ -423,8 +484,10 @@ public class CdsLoadBalancer2Test {
@Test @Test
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, CdsUpdate update =
upstreamTlsContext); CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.shutdown).isFalse(); assertThat(childBalancer.shutdown).isFalse();
loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable"));
@ -541,31 +604,8 @@ public class CdsLoadBalancer2Test {
watchers.remove(resourceName); watchers.remove(resourceName);
} }
private void deliverEdsCluster(String clusterName, @Nullable String edsServiceName, private void deliverCdsUpdate(String clusterName, CdsUpdate update) {
@Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) { if (watchers.containsKey(clusterName)) {
CdsUpdate update = CdsUpdate.forEds(
clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext)
.lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update);
}
}
private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) {
CdsUpdate update = CdsUpdate.forLogicalDns(
clusterName, lrsServerName, maxConcurrentRequests, tlsContext)
.lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update);
}
}
private void deliverAggregateCluster(String clusterName, List<String> clusters) {
if (watchers.containsKey(clusterName)) {
CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters)
.lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update); watchers.get(clusterName).onChanged(update);
} }
} }

View File

@ -62,6 +62,7 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.LdsResourceWatcher; import io.grpc.xds.XdsClient.LdsResourceWatcher;
@ -1132,7 +1133,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull(); assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1157,7 +1158,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull(); assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.RING_HASH);
assertThat(cdsUpdate.minRingSize()).isEqualTo(10L); assertThat(cdsUpdate.minRingSize()).isEqualTo(10L);
assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L); assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
@ -1183,7 +1184,7 @@ public abstract class ClientXdsClientTestBase {
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
@ -1204,7 +1205,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull(); assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L);
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1268,7 +1269,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull(); assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1306,7 +1307,7 @@ public abstract class ClientXdsClientTestBase {
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1323,7 +1324,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1344,7 +1345,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull(); assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1392,7 +1393,7 @@ public abstract class ClientXdsClientTestBase {
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1401,7 +1402,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1410,7 +1411,7 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();

View File

@ -65,6 +65,7 @@ import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
@ -113,6 +114,12 @@ public class ClusterResolverLoadBalancerTest {
CommonTlsContextTestsUtil.CLIENT_KEY_FILE, CommonTlsContextTestsUtil.CLIENT_KEY_FILE,
CommonTlsContextTestsUtil.CLIENT_PEM_FILE, CommonTlsContextTestsUtil.CLIENT_PEM_FILE,
CommonTlsContextTestsUtil.CA_PEM_FILE); CommonTlsContextTestsUtil.CA_PEM_FILE);
private final DiscoveryMechanism edsDiscoveryMechanism1 =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext);
private final DiscoveryMechanism edsDiscoveryMechanism2 =
DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext);
private final DiscoveryMechanism logicalDnsDiscoveryMechanism =
DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 300L, null);
private final SynchronizationContext syncContext = new SynchronizationContext( private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() { new Thread.UncaughtExceptionHandler() {
@ -126,8 +133,8 @@ public class ClusterResolverLoadBalancerTest {
private final NameResolverRegistry nsRegistry = new NameResolverRegistry(); private final NameResolverRegistry nsRegistry = new NameResolverRegistry();
private final PolicySelection roundRobin = private final PolicySelection roundRobin =
new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null); new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null);
private final PolicySelection weightedTarget = private final PolicySelection ringHash = new PolicySelection(
new PolicySelection(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME), null); new FakeLoadBalancerProvider("ring_hash"), new RingHashConfig(10L, 100L));
private final List<FakeLoadBalancer> childBalancers = new ArrayList<>(); private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
private final List<FakeNameResolver> resolvers = new ArrayList<>(); private final List<FakeNameResolver> resolvers = new ArrayList<>();
private final FakeXdsClient xdsClient = new FakeXdsClient(); private final FakeXdsClient xdsClient = new FakeXdsClient();
@ -165,6 +172,7 @@ public class ClusterResolverLoadBalancerTest {
lbRegistry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_IMPL_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_IMPL_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME));
lbRegistry.register( lbRegistry.register(
new FakeLoadBalancerProvider("pick_first")); // needed by logical_dns new FakeLoadBalancerProvider("pick_first")); // needed by logical_dns
URI targetUri = new URI(AUTHORITY); URI targetUri = new URI(AUTHORITY);
@ -198,9 +206,66 @@ public class ClusterResolverLoadBalancerTest {
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
} }
@Test
public void edsClustersWithRingHashEndpointLbPolicy() {
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(edsDiscoveryMechanism1), ringHash);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
// One priority with two localities of different weights.
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 =
LocalityLbEndpoints.create(
Collections.singletonList(
LbEndpoint.create(endpoint1, 100 /* loadBalancingWeight */, true)),
10 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints2 =
LocalityLbEndpoints.create(
Collections.singletonList(
LbEndpoint.create(endpoint2, 60 /* loadBalancingWeight */, true)),
50 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.addresses).hasSize(2);
EquivalentAddressGroup addr1 = childBalancer.addresses.get(0);
EquivalentAddressGroup addr2 = childBalancer.addresses.get(1);
assertThat(addr1.getAddresses()).isEqualTo(endpoint1.getAddresses());
assertThat(addr1.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT))
.isEqualTo(10 * 100);
assertThat(addr2.getAddresses()).isEqualTo(endpoint2.getAddresses());
assertThat(addr2.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT))
.isEqualTo(50 * 60);
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
PriorityChildConfig priorityChildConfig =
Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
ClusterImplConfig clusterImplConfig =
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L,
tlsContext, Collections.<DropOverload>emptyList(), "ring_hash");
RingHashConfig ringHashConfig =
(RingHashConfig) clusterImplConfig.childPolicy.getConfig();
assertThat(ringHashConfig.minRingSize).isEqualTo(10L);
assertThat(ringHashConfig.maxRingSize).isEqualTo(100L);
}
@Test @Test
public void onlyEdsClusters_receivedEndpoints() { public void onlyEdsClusters_receivedEndpoints() {
deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
// CLUSTER1 has priority 1 (priority3), which has locality 2, which has endpoint3. // CLUSTER1 has priority 1 (priority3), which has locality 2, which has endpoint3.
// CLUSTER2 has priority 1 (priority1) and 2 (priority2); priority1 has locality1, // CLUSTER2 has priority 1 (priority1) and 2 (priority2); priority1 has locality1,
// which has endpoint1 and endpoint2; priority2 has locality3, which has endpoint4. // which has endpoint1 and endpoint2; priority2 has locality3, which has endpoint4.
@ -209,11 +274,19 @@ public class ClusterResolverLoadBalancerTest {
EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3");
EquivalentAddressGroup endpoint4 = makeAddress("endpoint-addr-4"); EquivalentAddressGroup endpoint4 = makeAddress("endpoint-addr-4");
LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints localityLbEndpoints1 =
buildLocalityLbEndpoints(1, 70, ImmutableMap.of(endpoint1, true, endpoint2, true)); LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint1, 100, true),
LbEndpoint.create(endpoint2, 100, true)),
70 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints localityLbEndpoints2 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)),
10 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints3 = LocalityLbEndpoints localityLbEndpoints3 =
buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint4, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true)),
20 /* localityWeight */, 2 /* priority */);
String priority1 = CLUSTER2 + "[priority1]"; String priority1 = CLUSTER2 + "[priority1]";
String priority2 = CLUSTER2 + "[priority2]"; String priority2 = CLUSTER2 + "[priority2]";
String priority3 = CLUSTER1 + "[priority1]"; String priority3 = CLUSTER1 + "[priority1]";
@ -291,7 +364,11 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() { public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -308,14 +385,22 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() { public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() {
deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints localityLbEndpoints1 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)),
10 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints localityLbEndpoints2 =
buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint2, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true)),
20 /* localityWeight */, 2 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1));
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
@ -333,25 +418,19 @@ public class ClusterResolverLoadBalancerTest {
assertPicker(pickerCaptor.getValue(), expectedError, null); assertPicker(pickerCaptor.getValue(), expectedError, null);
} }
private void deliverConfigWithEdsClusters() {
DiscoveryMechanism instance1 =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext);
DiscoveryMechanism instance2 =
DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext);
ClusterResolverConfig config =
new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();
}
@Test @Test
public void handleEdsResource_ignoreUnhealthyEndpoints() { public void handleEdsResource_ignoreUnhealthyEndpoints() {
deliverConfigWithSingleEdsCluster(); // CLUSTER1 ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, ImmutableMap.of(endpoint1, false, endpoint2, true)); LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(endpoint1, 100, false /* isHealthy */),
LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
@ -361,13 +440,19 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
deliverConfigWithSingleEdsCluster(); // CLUSTER1 ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints localityLbEndpoints1 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints localityLbEndpoints2 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint2, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
String priority = CLUSTER1 + "[priority1]"; String priority = CLUSTER1 + "[priority1]";
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, EDS_SERVICE_NAME1,
@ -385,29 +470,38 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() { public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
deliverConfigWithSingleEdsCluster(); // CLUSTER1 ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
deliverLbConfig(config);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints1 = LocalityLbEndpoints localityLbEndpoints1 =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
LocalityLbEndpoints localityLbEndpoints2 = LocalityLbEndpoints localityLbEndpoints2 =
buildLocalityLbEndpoints(2, 10, Collections.singletonMap(endpoint2, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */)),
10 /* localityWeight */, 2 /* priority */);
String priority2 = CLUSTER1 + "[priority2]"; String priority2 = CLUSTER1 + "[priority2]";
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
PriorityLbConfig config = (PriorityLbConfig) childBalancer.config; assertThat(((PriorityLbConfig) childBalancer.config).priorities).containsExactly(priority2);
assertThat(config.priorities).containsExactly(priority2);
} }
@Test @Test
public void handleEdsResource_noHealthyEndpoint() { public void handleEdsResource_noHealthyEndpoint() {
deliverConfigWithSingleEdsCluster(); // CLUSTER1 ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin);
deliverLbConfig(config);
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, false)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1, xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1,
Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy
@ -418,19 +512,13 @@ public class ClusterResolverLoadBalancerTest {
Status.UNAVAILABLE.withDescription("No usable endpoint"), null); Status.UNAVAILABLE.withDescription("No usable endpoint"), null);
} }
private void deliverConfigWithSingleEdsCluster() {
DiscoveryMechanism instance =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null);
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(childBalancers).isEmpty();
}
@Test @Test
public void onlyLogicalDnsCluster_endpointsResolved() { public void onlyLogicalDnsCluster_endpointsResolved() {
deliverConfigWithSingleLogicalDnsCluster(); ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@ -447,14 +535,18 @@ public class ClusterResolverLoadBalancerTest {
.isEqualTo(CLUSTER_IMPL_POLICY_NAME); .isEqualTo(CLUSTER_IMPL_POLICY_NAME);
ClusterImplConfig clusterImplConfig = ClusterImplConfig clusterImplConfig =
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); (ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 100L, null, assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 300L, null,
Collections.<DropOverload>emptyList(), "pick_first"); Collections.<DropOverload>emptyList(), "pick_first");
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses); assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
} }
@Test @Test
public void onlyLogicalDnsCluster_handleRefreshNameResolution() { public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
deliverConfigWithSingleLogicalDnsCluster(); ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@ -470,7 +562,11 @@ public class ClusterResolverLoadBalancerTest {
public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
backoffPolicy1, backoffPolicy2); backoffPolicy1, backoffPolicy2);
deliverConfigWithSingleLogicalDnsCluster(); ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
resolver.deliverError(error); resolver.deliverError(error);
@ -513,7 +609,11 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() { public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
deliverConfigWithSingleLogicalDnsCluster(); ClusterResolverConfig config = new ClusterResolverConfig(
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr"); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr");
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
resolver.deliverEndpointAddresses(Collections.singletonList(endpoint)); resolver.deliverEndpointAddresses(Collections.singletonList(endpoint));
@ -547,26 +647,23 @@ public class ClusterResolverLoadBalancerTest {
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }
private void deliverConfigWithSingleLogicalDnsCluster() {
DiscoveryMechanism instance =
DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 100L, null);
ClusterResolverConfig config =
new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin);
deliverLbConfig(config);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
}
@Test @Test
public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
deliverConfigWithEdsAndLogicalDnsClusters(); // CLUSTER1 and CLUSTER_DNS ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); // DNS endpoint EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); // DNS endpoint
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); // DNS endpoint EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); // DNS endpoint
EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); // EDS endpoint EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); // EDS endpoint
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
@ -587,7 +684,12 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void noEdsResourceExists_useDnsResolutionResults() { public void noEdsResourceExists_useDnsResolutionResults() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
@ -609,11 +711,18 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() { public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, true)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@ -634,11 +743,18 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() { public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1");
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint, 100, true)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created
@ -656,7 +772,12 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() { public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
xdsClient.deliverError(Status.UNIMPLEMENTED.withDescription("not found")); xdsClient.deliverError(Status.UNIMPLEMENTED.withDescription("not found"));
assertThat(childBalancers).isEmpty(); assertThat(childBalancers).isEmpty();
@ -672,7 +793,12 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() { public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable"); Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable");
loadBalancer.handleNameResolutionError(upstreamError); loadBalancer.handleNameResolutionError(upstreamError);
@ -683,12 +809,19 @@ public class ClusterResolverLoadBalancerTest {
@Test @Test
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
deliverConfigWithEdsAndLogicalDnsClusters(); ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
reset(helper); reset(helper);
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
LocalityLbEndpoints localityLbEndpoints = LocalityLbEndpoints localityLbEndpoints =
buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)),
10 /* localityWeight */, 1 /* priority */);
xdsClient.deliverClusterLoadAssignment( xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints));
FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers);
@ -706,19 +839,6 @@ public class ClusterResolverLoadBalancerTest {
any(ConnectivityState.class), any(SubchannelPicker.class)); any(ConnectivityState.class), any(SubchannelPicker.class));
} }
private void deliverConfigWithEdsAndLogicalDnsClusters() {
DiscoveryMechanism instance1 =
DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null);
DiscoveryMechanism instance2 =
DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 200L, null);
ClusterResolverConfig config =
new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1);
assertThat(resolvers).hasSize(1);
assertThat(childBalancers).isEmpty();
}
private void deliverLbConfig(ClusterResolverConfig config) { private void deliverLbConfig(ClusterResolverConfig config) {
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder() ResolvedAddresses.newBuilder()
@ -765,16 +885,6 @@ public class ClusterResolverLoadBalancerTest {
} }
} }
private static LocalityLbEndpoints buildLocalityLbEndpoints(
int priority, int localityWeight, Map<EquivalentAddressGroup, Boolean> managedEndpoints) {
List<LbEndpoint> endpoints = new ArrayList<>();
for (EquivalentAddressGroup addr : managedEndpoints.keySet()) {
boolean status = managedEndpoints.get(addr);
endpoints.add(LbEndpoint.create(addr, 100 /* unused */, status));
}
return LocalityLbEndpoints.create(endpoints, localityWeight, priority);
}
private static EquivalentAddressGroup makeAddress(final String name) { private static EquivalentAddressGroup makeAddress(final String name) {
class FakeSocketAddress extends SocketAddress { class FakeSocketAddress extends SocketAddress {
private final String name; private final String name;