xds: XdsClient support for hashing based load balancing (#7859)

This change adds two parts to XdsClient for receiving configurations that support hashing based load balancing policies:

- Each Route contains a list of HashPolicys, which specifies the hash value generation for requests routed to that Route.
- Each Cluster resource can specify lb policy other than "round_robin". If it is "ring_hash", it contains the configuration for mapping each RPC's hash value to one of the endpoints.
This commit is contained in:
Chengyuan Zhang 2021-02-11 23:54:28 -08:00 committed by GitHub
parent 986a36b947
commit 7b70161eef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 628 additions and 432 deletions

View File

@ -34,11 +34,7 @@ import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -153,17 +149,16 @@ final class CdsLoadBalancer2 extends LoadBalancer {
} }
if (clusterState.isLeaf) { if (clusterState.isLeaf) {
DiscoveryMechanism instance; DiscoveryMechanism instance;
if (clusterState.result instanceof EdsClusterConfig) { if (clusterState.result.clusterType() == ClusterType.EDS) {
EdsClusterConfig clusterConfig = (EdsClusterConfig) clusterState.result; instance = DiscoveryMechanism.forEds(
instance = DiscoveryMechanism.forEds(clusterState.name, clusterConfig.edsServiceName, clusterState.name, clusterState.result.edsServiceName(),
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(),
clusterConfig.upstreamTlsContext); clusterState.result.upstreamTlsContext());
} else { // logical DNS } else { // logical DNS
LogicalDnsClusterConfig clusterConfig = instance = DiscoveryMechanism.forLogicalDns(
(LogicalDnsClusterConfig) clusterState.result; clusterState.name, clusterState.result.lrsServerName(),
instance = DiscoveryMechanism.forLogicalDns(clusterState.name, clusterState.result.maxConcurrentRequests(),
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, clusterState.result.upstreamTlsContext());
clusterConfig.upstreamTlsContext);
} }
instances.add(instance); instances.add(instance);
} else { } else {
@ -183,7 +178,7 @@ final class CdsLoadBalancer2 extends LoadBalancer {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return; return;
} }
String endpointPickingPolicy = root.result.lbPolicy; String endpointPickingPolicy = root.result.lbPolicy();
LoadBalancerProvider localityPickingLbProvider = LoadBalancerProvider localityPickingLbProvider =
lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded
LoadBalancerProvider endpointPickingLbProvider = LoadBalancerProvider endpointPickingLbProvider =
@ -212,7 +207,7 @@ final class CdsLoadBalancer2 extends LoadBalancer {
@Nullable @Nullable
private Map<String, ClusterState> childClusterStates; private Map<String, ClusterState> childClusterStates;
@Nullable @Nullable
private ClusterConfig result; private CdsUpdate result;
// Following fields are effectively final. // Following fields are effectively final.
private boolean isLeaf; private boolean isLeaf;
private boolean discovered; private boolean discovered;
@ -281,15 +276,15 @@ final class CdsLoadBalancer2 extends LoadBalancer {
if (shutdown) { if (shutdown) {
return; return;
} }
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true; discovered = true;
result = update.clusterConfig; result = update;
if (update.clusterType == ClusterType.AGGREGATE) { if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false; isLeaf = false;
AggregateClusterConfig clusterConfig = (AggregateClusterConfig) update.clusterConfig; logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}", update.clusterName); update.clusterName(), update.prioritizedClusterNames());
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
Map<String, ClusterState> newChildStates = new LinkedHashMap<>(); Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : clusterConfig.prioritizedClusterNames) { for (String cluster : update.prioritizedClusterNames()) {
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState = new ClusterState(cluster); ClusterState childState = new ClusterState(cluster);
childState.start(); childState.start();
@ -304,18 +299,13 @@ final class CdsLoadBalancer2 extends LoadBalancer {
} }
} }
childClusterStates = newChildStates; childClusterStates = newChildStates;
} else if (update.clusterType == ClusterType.EDS) { } else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true; isLeaf = true;
EdsClusterConfig clusterConfig = (EdsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName, clusterConfig.edsServiceName); update.clusterName(), update.edsServiceName());
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
} else { // logical DNS } else { // logical DNS
isLeaf = true; isLeaf = true;
LogicalDnsClusterConfig clusterConfig = logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
(LogicalDnsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
} }
handleClusterDiscovered(); handleClusterDiscovered();
} }

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS; import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -34,6 +35,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions;
import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
@ -65,11 +67,9 @@ import io.grpc.xds.Matchers.PathMatcher;
import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.HashFunction;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
@ -96,6 +96,8 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting @VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault"; private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 = private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2" "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager"; + ".HttpConnectionManager";
@ -468,10 +470,42 @@ final class ClientXdsClient extends AbstractXdsClient {
timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration()); timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration());
} }
} }
List<ClusterWeight> weightedClusters; List<HashPolicy> hashPolicies = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy config
: proto.getHashPolicyList()) {
HashPolicy policy = null;
boolean terminal = config.getTerminal();
switch (config.getPolicySpecifierCase()) {
case HEADER:
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header headerCfg =
config.getHeader();
Pattern regEx = null;
String regExSubstitute = null;
if (headerCfg.hasRegexRewrite() && headerCfg.getRegexRewrite().hasPattern()
&& headerCfg.getRegexRewrite().getPattern().hasGoogleRe2()) {
regEx = Pattern.compile(headerCfg.getRegexRewrite().getPattern().getRegex());
regExSubstitute = headerCfg.getRegexRewrite().getSubstitution();
}
policy = HashPolicy.forHeader(
terminal, headerCfg.getHeaderName(), regEx, regExSubstitute);
break;
case FILTER_STATE:
if (config.getFilterState().getKey().equals(HASH_POLICY_FILTER_STATE_KEY)) {
policy = HashPolicy.forChannelId(terminal);
}
break;
default:
// Ignore
}
if (policy != null) {
hashPolicies.add(policy);
}
}
switch (proto.getClusterSpecifierCase()) { switch (proto.getClusterSpecifierCase()) {
case CLUSTER: case CLUSTER:
return StructOrError.fromStruct(RouteAction.forCluster(proto.getCluster(), timeoutNano)); return StructOrError.fromStruct(RouteAction.forCluster(
proto.getCluster(), hashPolicies, timeoutNano));
case CLUSTER_HEADER: case CLUSTER_HEADER:
return null; return null;
case WEIGHTED_CLUSTERS: case WEIGHTED_CLUSTERS:
@ -480,7 +514,7 @@ final class ClientXdsClient extends AbstractXdsClient {
if (clusterWeights.isEmpty()) { if (clusterWeights.isEmpty()) {
return StructOrError.fromError("No cluster found in weighted cluster list"); return StructOrError.fromError("No cluster found in weighted cluster list");
} }
weightedClusters = new ArrayList<>(); List<ClusterWeight> weightedClusters = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight
: clusterWeights) { : clusterWeights) {
StructOrError<ClusterWeight> clusterWeightOrError = parseClusterWeight(clusterWeight); StructOrError<ClusterWeight> clusterWeightOrError = parseClusterWeight(clusterWeight);
@ -492,7 +526,7 @@ final class ClientXdsClient extends AbstractXdsClient {
} }
// TODO(chengyuanzhang): validate if the sum of weights equals to total weight. // TODO(chengyuanzhang): validate if the sum of weights equals to total weight.
return StructOrError.fromStruct(RouteAction.forWeightedClusters( return StructOrError.fromStruct(RouteAction.forWeightedClusters(
weightedClusters, timeoutNano)); weightedClusters, hashPolicies, timeoutNano));
case CLUSTERSPECIFIER_NOT_SET: case CLUSTERSPECIFIER_NOT_SET:
default: default:
return StructOrError.fromError( return StructOrError.fromError(
@ -713,30 +747,44 @@ final class ClientXdsClient extends AbstractXdsClient {
if (!cdsResourceSubscribers.containsKey(clusterName)) { if (!cdsResourceSubscribers.containsKey(clusterName)) {
continue; continue;
} }
// The lb_policy field must be set to ROUND_ROBIN. StructOrError<CdsUpdate.Builder> structOrError;
if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": unsupported Lb policy: " + cluster.getLbPolicy());
return;
}
String lbPolicy = "round_robin";
CdsUpdate update = null;
switch (cluster.getClusterDiscoveryTypeCase()) { switch (cluster.getClusterDiscoveryTypeCase()) {
case TYPE: case TYPE:
update = parseNonAggregateCluster(cluster, nonce, lbPolicy, edsResources); structOrError = parseNonAggregateCluster(cluster, edsResources);
break; break;
case CLUSTER_TYPE: case CLUSTER_TYPE:
update = parseAggregateCluster(cluster, nonce, lbPolicy); structOrError = parseAggregateCluster(cluster);
break; break;
case CLUSTERDISCOVERYTYPE_NOT_SET: case CLUSTERDISCOVERYTYPE_NOT_SET:
default: default:
nackResponse(ResourceType.CDS, nonce, nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": cluster discovery type unspecified"); "Cluster " + clusterName + ": cluster discovery type unspecified");
}
if (update == null) {
return; return;
} }
cdsUpdates.put(clusterName, update); if (structOrError.getErrorDetail() != null) {
nackResponse(ResourceType.CDS, nonce, structOrError.errorDetail);
return;
}
CdsUpdate.Builder updateBuilder = structOrError.getStruct();
String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name());
if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
HashFunction hashFunction;
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) {
hashFunction = HashFunction.XX_HASH;
} else {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": unsupported ring hash function: "
+ lbConfig.getHashFunction());
return;
}
updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(),
lbConfig.getMaximumRingSize().getValue(), hashFunction);
} else {
updateBuilder.lbPolicy(lbPolicy);
}
cdsUpdates.put(clusterName, updateBuilder.build());
} }
ackResponse(ResourceType.CDS, versionInfo, nonce); ackResponse(ResourceType.CDS, versionInfo, nonce);
@ -756,18 +804,13 @@ final class ClientXdsClient extends AbstractXdsClient {
} }
} }
/** private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
* Parses CDS resource for an aggregate cluster into {@link io.grpc.xds.XdsClient.CdsUpdate}.
* Returns {@code null} and nack the response with the given nonce if the resource is invalid.
*/
private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lbPolicy) {
String clusterName = cluster.getName(); String clusterName = cluster.getName();
CustomClusterType customType = cluster.getClusterType(); CustomClusterType customType = cluster.getClusterType();
String typeName = customType.getName(); String typeName = customType.getName();
if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) { if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) {
nackResponse(ResourceType.CDS, nonce, return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported custom cluster type: " + typeName); "Cluster " + clusterName + ": unsupported custom cluster type: " + typeName);
return null;
} }
io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig clusterConfig; io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig clusterConfig;
Any unpackedClusterConfig = customType.getTypedConfig(); Any unpackedClusterConfig = customType.getTypedConfig();
@ -779,31 +822,23 @@ final class ClientXdsClient extends AbstractXdsClient {
clusterConfig = unpackedClusterConfig.unpack( clusterConfig = unpackedClusterConfig.unpack(
io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class); io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
nackResponse(ResourceType.CDS, nonce, StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
"Cluster " + clusterName + ": invalid cluster config: " + e);
return null; return null;
} }
AggregateClusterConfig config = return StructOrError.fromStruct(CdsUpdate.forAggregate(
new AggregateClusterConfig(lbPolicy, clusterConfig.getClustersList()); clusterName, clusterConfig.getClustersList()));
return new CdsUpdate(clusterName, ClusterType.AGGREGATE, config);
} }
/** private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
* Parses CDS resource for a non-aggregate cluster (EDS or Logical DNS) into {@link Cluster cluster, Set<String> edsResources) {
* io.grpc.xds.XdsClient.CdsUpdate}. Returns {@code null} and nack the response with the given
* nonce if the resource is invalid.
*/
private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String lbPolicy,
Set<String> edsResources) {
String clusterName = cluster.getName(); String clusterName = cluster.getName();
String lrsServerName = null; String lrsServerName = null;
Long maxConcurrentRequests = null; Long maxConcurrentRequests = null;
UpstreamTlsContext upstreamTlsContext = null; UpstreamTlsContext upstreamTlsContext = null;
if (cluster.hasLrsServer()) { if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) { if (!cluster.getLrsServer().hasSelf()) {
nackResponse(ResourceType.CDS, nonce, return StructOrError.fromError(
"Cluster " + clusterName + ": only support LRS for the same management server"); "Cluster " + clusterName + ": only support LRS for the same management server");
return null;
} }
lrsServerName = ""; lrsServerName = "";
} }
@ -829,9 +864,8 @@ final class ClientXdsClient extends AbstractXdsClient {
unpacked = any.unpack( unpacked = any.unpack(
io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class); io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class);
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
nackResponse(ResourceType.CDS, nonce, return StructOrError.fromError(
"Cluster " + clusterName + ": invalid upstream TLS context: " + e); "Cluster " + clusterName + ": malformed UpstreamTlsContext: " + e);
return null;
} }
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(unpacked); upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(unpacked);
} }
@ -842,9 +876,8 @@ final class ClientXdsClient extends AbstractXdsClient {
io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig edsClusterConfig = io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig edsClusterConfig =
cluster.getEdsClusterConfig(); cluster.getEdsClusterConfig();
if (!edsClusterConfig.getEdsConfig().hasAds()) { if (!edsClusterConfig.getEdsConfig().hasAds()) {
nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": " return StructOrError.fromError("Cluster " + clusterName
+ "field eds_cluster_config must be set to indicate to use EDS over ADS."); + ": field eds_cluster_config must be set to indicate to use EDS over ADS.");
return null;
} }
// If the service_name field is set, that value will be used for the EDS request. // If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) { if (!edsClusterConfig.getServiceName().isEmpty()) {
@ -853,17 +886,14 @@ final class ClientXdsClient extends AbstractXdsClient {
} else { } else {
edsResources.add(clusterName); edsResources.add(clusterName);
} }
EdsClusterConfig config = new EdsClusterConfig(lbPolicy, edsServiceName, return StructOrError.fromStruct(CdsUpdate.forEds(
lrsServerName, maxConcurrentRequests, upstreamTlsContext); clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext));
return new CdsUpdate(clusterName, ClusterType.EDS, config);
} else if (type.equals(DiscoveryType.LOGICAL_DNS)) { } else if (type.equals(DiscoveryType.LOGICAL_DNS)) {
LogicalDnsClusterConfig config = new LogicalDnsClusterConfig(lbPolicy, lrsServerName, return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
maxConcurrentRequests, upstreamTlsContext); clusterName, lrsServerName, maxConcurrentRequests, upstreamTlsContext));
return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, config);
} }
nackResponse(ResourceType.CDS, nonce, return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type); "Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
return null;
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue; import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.re2j.Pattern;
import io.grpc.xds.Matchers.FractionMatcher; import io.grpc.xds.Matchers.FractionMatcher;
import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.HeaderMatcher;
import io.grpc.xds.Matchers.PathMatcher; import io.grpc.xds.Matchers.PathMatcher;
@ -89,6 +90,9 @@ abstract class VirtualHost {
@AutoValue @AutoValue
abstract static class RouteAction { abstract static class RouteAction {
// List of hash policies to use for ring hash load balancing.
abstract ImmutableList<HashPolicy> hashPolicies();
@Nullable @Nullable
abstract Long timeoutNano(); abstract Long timeoutNano();
@ -98,21 +102,23 @@ abstract class VirtualHost {
@Nullable @Nullable
abstract ImmutableList<ClusterWeight> weightedClusters(); abstract ImmutableList<ClusterWeight> weightedClusters();
static RouteAction forCluster(String cluster, @Nullable Long timeoutNano) { static RouteAction forCluster(String cluster, List<HashPolicy> hashPolicies,
@Nullable Long timeoutNano) {
checkNotNull(cluster, "cluster"); checkNotNull(cluster, "cluster");
return RouteAction.create(timeoutNano, cluster, null); return RouteAction.create(hashPolicies, timeoutNano, cluster, null);
} }
static RouteAction forWeightedClusters(List<ClusterWeight> weightedClusters, static RouteAction forWeightedClusters(List<ClusterWeight> weightedClusters,
@Nullable Long timeoutNano) { List<HashPolicy> hashPolicies, @Nullable Long timeoutNano) {
checkNotNull(weightedClusters, "weightedClusters"); checkNotNull(weightedClusters, "weightedClusters");
checkArgument(!weightedClusters.isEmpty(), "empty cluster list"); checkArgument(!weightedClusters.isEmpty(), "empty cluster list");
return RouteAction.create(timeoutNano, null, weightedClusters); return RouteAction.create(hashPolicies, timeoutNano, null, weightedClusters);
} }
private static RouteAction create(@Nullable Long timeoutNano, @Nullable String cluster, private static RouteAction create(List<HashPolicy> hashPolicies, @Nullable Long timeoutNano,
@Nullable List<ClusterWeight> weightedClusters) { @Nullable String cluster, @Nullable List<ClusterWeight> weightedClusters) {
return new AutoValue_VirtualHost_Route_RouteAction(timeoutNano, cluster, return new AutoValue_VirtualHost_Route_RouteAction(
ImmutableList.copyOf(hashPolicies), timeoutNano, cluster,
weightedClusters == null ? null : ImmutableList.copyOf(weightedClusters)); weightedClusters == null ? null : ImmutableList.copyOf(weightedClusters));
} }
@ -130,6 +136,52 @@ abstract class VirtualHost {
httpFault); httpFault);
} }
} }
// Configuration for the route's hashing policy if the upstream cluster uses a hashing load
// balancer.
@AutoValue
abstract static class HashPolicy {
// The specifier that indicates the component of the request to be hashed on.
abstract Type type();
// The flag that short-circuits the hash computing.
abstract boolean isTerminal();
// The name of the request header that will be used to obtain the hash key.
// Only valid if type is HEADER.
@Nullable
abstract String headerName();
// The regular expression used to find portions to be replaced in the header value.
// Only valid if type is HEADER.
@Nullable
abstract Pattern regEx();
// The string that should be substituted into matching portions of the header value.
// Only valid if type is HEADER.
@Nullable
abstract String regExSubstitution();
static HashPolicy forHeader(boolean isTerminal, String headerName,
@Nullable Pattern regEx, @Nullable String regExSubstitution) {
checkNotNull(headerName, "headerName");
return HashPolicy.create(Type.HEADER, isTerminal, headerName, regEx, regExSubstitution);
}
static HashPolicy forChannelId(boolean isTerminal) {
return HashPolicy.create(Type.CHANNEL_ID, isTerminal, null, null, null);
}
private static HashPolicy create(Type type, boolean isTerminal, @Nullable String headerName,
@Nullable Pattern regEx, @Nullable String regExSubstitution) {
return new AutoValue_VirtualHost_Route_RouteAction_HashPolicy(type, isTerminal,
headerName, regEx, regExSubstitution);
}
enum Type {
HEADER, CHANNEL_ID
}
}
} }
} }
} }

View File

@ -19,8 +19,10 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.ImmutableList;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.Endpoints.LocalityLbEndpoints;
@ -155,197 +157,155 @@ abstract class XdsClient {
} }
} }
static final class CdsUpdate implements ResourceUpdate { /** xDS resource update for cluster-level configuration. */
final String clusterName; @AutoValue
final ClusterType clusterType; abstract static class CdsUpdate implements ResourceUpdate {
final ClusterConfig clusterConfig; abstract String clusterName();
CdsUpdate(String clusterName, ClusterType clusterType, ClusterConfig clusterConfig) { abstract ClusterType clusterType();
this.clusterName = checkNotNull(clusterName, "clusterName");
this.clusterType = checkNotNull(clusterType, "clusterType"); // Endpoint-level load balancing policy.
this.clusterConfig = checkNotNull(clusterConfig, "clusterConfig"); abstract String lbPolicy();
// Only valid if lbPolicy is "ring_hash".
abstract long minRingSize();
// Only valid if lbPolicy is "ring_hash".
abstract long maxRingSize();
// Only valid if lbPolicy is "ring_hash".
@Nullable
abstract HashFunction hashFunction();
// Alternative resource name to be used in EDS requests.
/// Only valid for EDS cluster.
@Nullable
abstract String edsServiceName();
// Load report server name for reporting loads via LRS.
// Only valid for EDS or LOGICAL_DNS cluster.
@Nullable
abstract String lrsServerName();
// Max number of concurrent requests can be sent to this cluster.
// Only valid for EDS or LOGICAL_DNS cluster.
@Nullable
abstract Long maxConcurrentRequests();
// TLS context used to connect to connect to this cluster.
// Only valid for EDS or LOGICAL_DNS cluster.
@Nullable
abstract UpstreamTlsContext upstreamTlsContext();
// List of underlying clusters making of this aggregate cluster.
// Only valid for AGGREGATE cluster.
@Nullable
abstract ImmutableList<String> prioritizedClusterNames();
static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) {
checkNotNull(prioritizedClusterNames, "prioritizedClusterNames");
return new AutoValue_XdsClient_CdsUpdate.Builder()
.clusterName(clusterName)
.clusterType(ClusterType.AGGREGATE)
.minRingSize(0)
.maxRingSize(0)
.prioritizedClusterNames(ImmutableList.copyOf(prioritizedClusterNames));
} }
@Override static Builder forEds(String clusterName, @Nullable String edsServiceName,
public int hashCode() { @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
return Objects.hash(clusterName, clusterType, clusterConfig); @Nullable UpstreamTlsContext upstreamTlsContext) {
return new AutoValue_XdsClient_CdsUpdate.Builder()
.clusterName(clusterName)
.clusterType(ClusterType.EDS)
.minRingSize(0)
.maxRingSize(0)
.edsServiceName(edsServiceName)
.lrsServerName(lrsServerName)
.maxConcurrentRequests(maxConcurrentRequests)
.upstreamTlsContext(upstreamTlsContext);
} }
@Override static Builder forLogicalDns(String clusterName, @Nullable String lrsServerName,
public boolean equals(Object o) { @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) {
if (this == o) { return new AutoValue_XdsClient_CdsUpdate.Builder()
return true; .clusterName(clusterName)
} .clusterType(ClusterType.LOGICAL_DNS)
if (o == null || getClass() != o.getClass()) { .minRingSize(0)
return false; .maxRingSize(0)
} .lrsServerName(lrsServerName)
CdsUpdate that = (CdsUpdate) o; .maxConcurrentRequests(maxConcurrentRequests)
return Objects.equals(clusterName, that.clusterName) .upstreamTlsContext(upstreamTlsContext);
&& Objects.equals(clusterType, that.clusterType)
&& Objects.equals(clusterConfig, that.clusterConfig);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("clusterName", clusterName)
.add("clusterType", clusterType)
.add("clusterConfig", clusterConfig)
.toString();
} }
enum ClusterType { enum ClusterType {
EDS, LOGICAL_DNS, AGGREGATE EDS, LOGICAL_DNS, AGGREGATE
} }
abstract static class ClusterConfig { enum HashFunction {
// Endpoint level load balancing policy. XX_HASH
final String lbPolicy;
private ClusterConfig(String lbPolicy) {
this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy");
}
}
static final class AggregateClusterConfig extends ClusterConfig {
// List of underlying clusters making of this aggregate cluster.
final List<String> prioritizedClusterNames;
AggregateClusterConfig(String lbPolicy, List<String> prioritizedClusterNames) {
super(lbPolicy);
this.prioritizedClusterNames =
Collections.unmodifiableList(new ArrayList<>(prioritizedClusterNames));
} }
// FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed.
@Override @Override
public int hashCode() { public final String toString() {
return Objects.hash(lbPolicy, prioritizedClusterNames);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AggregateClusterConfig that = (AggregateClusterConfig) o;
return Objects.equals(lbPolicy, that.lbPolicy)
&& Objects.equals(prioritizedClusterNames, that.prioritizedClusterNames);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
.add("lbPolicy", lbPolicy) .add("clusterName", clusterName())
.add("prioritizedClusterNames", prioritizedClusterNames) .add("clusterType", clusterType())
.toString(); .add("lbPolicy", lbPolicy())
} .add("minRingSize", minRingSize())
} .add("maxRingSize", maxRingSize())
.add("hashFunction", hashFunction())
private abstract static class NonAggregateClusterConfig extends ClusterConfig { .add("edsServiceName", edsServiceName())
// Load report server name for reporting loads via LRS. .add("lrsServerName", lrsServerName())
@Nullable .add("maxConcurrentRequests", maxConcurrentRequests())
final String lrsServerName;
// Max number of concurrent requests can be sent to this cluster.
// FIXME(chengyuanzhang): protobuf uint32 is int in Java, so this field can be Integer.
@Nullable
final Long maxConcurrentRequests;
// TLS context used to connect to connect to this cluster.
@Nullable
final UpstreamTlsContext upstreamTlsContext;
private NonAggregateClusterConfig(String lbPolicy, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) {
super(lbPolicy);
this.lrsServerName = lrsServerName;
this.maxConcurrentRequests = maxConcurrentRequests;
this.upstreamTlsContext = upstreamTlsContext;
}
}
static final class EdsClusterConfig extends NonAggregateClusterConfig {
// Alternative resource name to be used in EDS requests.
@Nullable
final String edsServiceName;
EdsClusterConfig(String lbPolicy, @Nullable String edsServiceName,
@Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext upstreamTlsContext) {
super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext);
this.edsServiceName = edsServiceName;
}
@Override
public int hashCode() {
return Objects.hash(lbPolicy, edsServiceName, lrsServerName, maxConcurrentRequests,
upstreamTlsContext);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EdsClusterConfig that = (EdsClusterConfig) o;
return Objects.equals(lbPolicy, that.lbPolicy)
&& Objects.equals(edsServiceName, that.edsServiceName)
&& Objects.equals(lrsServerName, that.lrsServerName)
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
&& Objects.equals(upstreamTlsContext, that.upstreamTlsContext);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("lbPolicy", lbPolicy)
.add("edsServiceName", edsServiceName)
.add("lrsServerName", lrsServerName)
.add("maxConcurrentRequests", maxConcurrentRequests)
// Exclude upstreamTlsContext as its string representation is cumbersome. // Exclude upstreamTlsContext as its string representation is cumbersome.
.add("prioritizedClusterNames", prioritizedClusterNames())
.toString(); .toString();
} }
@AutoValue.Builder
abstract static class Builder {
// Private do not use.
protected abstract Builder clusterName(String clusterName);
// Private do not use.
protected abstract Builder clusterType(ClusterType clusterType);
// Private do not use.
protected abstract Builder lbPolicy(String lbPolicy);
Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize,
HashFunction hashFunction) {
return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize)
.hashFunction(checkNotNull(hashFunction, "hashFunction"));
} }
static final class LogicalDnsClusterConfig extends NonAggregateClusterConfig { // Private do not use.
LogicalDnsClusterConfig(String lbPolicy, @Nullable String lrsServerName, protected abstract Builder minRingSize(long minRingSize);
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) {
super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext);
}
@Override // Private do not use.
public int hashCode() { protected abstract Builder maxRingSize(long maxRingSize);
return Objects.hash(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext);
}
@Override // Private do not use.
public boolean equals(Object o) { protected abstract Builder hashFunction(HashFunction hashFunction);
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalDnsClusterConfig that = (LogicalDnsClusterConfig) o;
return Objects.equals(lbPolicy, that.lbPolicy)
&& Objects.equals(lrsServerName, that.lrsServerName)
&& Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests)
&& Objects.equals(upstreamTlsContext, that.upstreamTlsContext);
}
@Override // Private do not use.
public String toString() { protected abstract Builder edsServiceName(String edsServiceName);
return MoreObjects.toStringHelper(this)
.add("lbPolicy", lbPolicy) // Private do not use.
.add("lrsServerName", lrsServerName) protected abstract Builder lrsServerName(String lrsServerName);
.add("maxConcurrentRequests", maxConcurrentRequests)
// Exclude upstreamTlsContext as its string representation is cumbersome. // Private do not use.
.toString(); protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests);
}
// Private do not use.
protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext);
// Private do not use.
protected abstract Builder prioritizedClusterNames(List<String> prioritizedClusterNames);
abstract CdsUpdate build();
} }
} }

View File

@ -47,10 +47,6 @@ import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -548,9 +544,9 @@ public class CdsLoadBalancer2Test {
@Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) { @Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) { if (watchers.containsKey(clusterName)) {
EdsClusterConfig clusterConfig = new EdsClusterConfig("round_robin", edsServiceName, CdsUpdate update = CdsUpdate.forEds(
lrsServerName, maxConcurrentRequests, tlsContext); clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext)
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.EDS, clusterConfig); .lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update); watchers.get(clusterName).onChanged(update);
} }
} }
@ -558,17 +554,17 @@ public class CdsLoadBalancer2Test {
private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName, private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) { if (watchers.containsKey(clusterName)) {
LogicalDnsClusterConfig clusterConfig = new LogicalDnsClusterConfig("round_robin", CdsUpdate update = CdsUpdate.forLogicalDns(
lrsServerName, maxConcurrentRequests, tlsContext); clusterName, lrsServerName, maxConcurrentRequests, tlsContext)
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, clusterConfig); .lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update); watchers.get(clusterName).onChanged(update);
} }
} }
private void deliverAggregateCluster(String clusterName, List<String> clusters) { private void deliverAggregateCluster(String clusterName, List<String> clusters) {
if (watchers.containsKey(clusterName)) { if (watchers.containsKey(clusterName)) {
AggregateClusterConfig clusterConfig = new AggregateClusterConfig("round_robin", clusters); CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters)
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.AGGREGATE, clusterConfig); .lbPolicy("round_robin").build();
watchers.get(clusterName).onChanged(update); watchers.get(clusterName).onChanged(update);
} }
} }

View File

@ -29,10 +29,16 @@ import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
import io.envoyproxy.envoy.config.route.v3.DirectResponseAction; import io.envoyproxy.envoy.config.route.v3.DirectResponseAction;
import io.envoyproxy.envoy.config.route.v3.FilterAction; import io.envoyproxy.envoy.config.route.v3.FilterAction;
import io.envoyproxy.envoy.config.route.v3.RedirectAction; import io.envoyproxy.envoy.config.route.v3.RedirectAction;
import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.ConnectionProperties;
import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.FilterState;
import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header;
import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.QueryParameter;
import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration; import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration;
import io.envoyproxy.envoy.config.route.v3.WeightedCluster; import io.envoyproxy.envoy.config.route.v3.WeightedCluster;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort; import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort;
import io.envoyproxy.envoy.type.matcher.v3.RegexMatchAndSubstitute;
import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher; import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher;
import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher.GoogleRE2;
import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent;
import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType;
import io.envoyproxy.envoy.type.v3.Int64Range; import io.envoyproxy.envoy.type.v3.Int64Range;
@ -47,9 +53,11 @@ import io.grpc.xds.Matchers.PathMatcher;
import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -77,7 +85,8 @@ public class ClientXdsClientDataTest {
Route.create( Route.create(
RouteMatch.create(PathMatcher.fromPath("/service/method", false), RouteMatch.create(PathMatcher.fromPath("/service/method", false),
Collections.<HeaderMatcher>emptyList(), null), Collections.<HeaderMatcher>emptyList(), null),
RouteAction.forCluster("cluster-foo", null), null)); RouteAction.forCluster("cluster-foo", Collections.<HashPolicy>emptyList(), null),
null));
} }
@Test @Test
@ -390,6 +399,50 @@ public class ClientXdsClientDataTest {
assertThat(struct.getStruct().timeoutNano()).isNull(); assertThat(struct.getStruct().timeoutNano()).isNull();
} }
@Test
public void parseRouteAction_withHashPolicies() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.addHashPolicy(
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder()
.setHeader(
Header.newBuilder()
.setHeaderName("user-agent")
.setRegexRewrite(
RegexMatchAndSubstitute.newBuilder()
.setPattern(
RegexMatcher.newBuilder()
.setGoogleRe2(GoogleRE2.getDefaultInstance())
.setRegex("grpc.*"))
.setSubstitution("gRPC"))))
.addHashPolicy(
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder()
.setConnectionProperties(ConnectionProperties.newBuilder().setSourceIp(true))
.setTerminal(true)) // unsupported
.addHashPolicy(
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder()
.setFilterState(
FilterState.newBuilder()
.setKey(ClientXdsClient.HASH_POLICY_FILTER_STATE_KEY)))
.addHashPolicy(
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder()
.setQueryParameter(
QueryParameter.newBuilder().setName("param"))) // unsupported
.build();
StructOrError<RouteAction> struct = ClientXdsClient.parseRouteAction(proto);
List<HashPolicy> policies = struct.getStruct().hashPolicies();
assertThat(policies).hasSize(2);
assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER);
assertThat(policies.get(0).headerName()).isEqualTo("user-agent");
assertThat(policies.get(0).isTerminal()).isFalse();
assertThat(policies.get(0).regEx().pattern()).isEqualTo("grpc.*");
assertThat(policies.get(0).regExSubstitution()).isEqualTo("gRPC");
assertThat(policies.get(1).type()).isEqualTo(HashPolicy.Type.CHANNEL_ID);
assertThat(policies.get(1).isTerminal()).isFalse();
}
@Test @Test
public void parseClusterWeight() { public void parseClusterWeight() {
io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto = io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto =

View File

@ -47,14 +47,11 @@ import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LbEndpoint;
import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.Endpoints.LocalityLbEndpoints;
import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.HashFunction;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.LdsResourceWatcher; import io.grpc.xds.XdsClient.LdsResourceWatcher;
@ -597,8 +594,10 @@ public abstract class ClientXdsClientTestBase {
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, false, null, null)), Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, "round_robin", null,
Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); false, null, null)),
Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null,
false, null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
@ -616,7 +615,7 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
@ -624,14 +623,41 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(clusterConfig.edsServiceName).isNull(); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(clusterConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(clusterConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(clusterConfig.upstreamTlsContext).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
}
@Test
public void cdsResourceFound_ringHashLbPolicy() {
DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
Message ringHashConfig = mf.buildRingHashLbConfig("xx_hash", 10L, 100L);
List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "ring_hash", ringHashConfig, false, null,
null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sent an ACK CDS request.
call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS,
"0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash");
assertThat(cdsUpdate.hashFunction()).isEqualTo(HashFunction.XX_HASH);
assertThat(cdsUpdate.minRingSize()).isEqualTo(10L);
assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L);
assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
} }
@ -642,7 +668,7 @@ public abstract class ClientXdsClientTestBase {
List<String> candidates = Arrays.asList( List<String> candidates = Arrays.asList(
"cluster1.googleapis.com", "cluster2.googleapis.com", "cluster3.googleapis.com"); "cluster1.googleapis.com", "cluster2.googleapis.com", "cluster3.googleapis.com");
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, candidates))); Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, candidates)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
@ -650,12 +676,10 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.AGGREGATE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE);
AggregateClusterConfig clusterConfig = (AggregateClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder();
assertThat(clusterConfig.prioritizedClusterNames)
.containsExactlyElementsIn(candidates).inOrder();
} }
@Test @Test
@ -663,7 +687,7 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null,
mf.buildCircuitBreakers(50, 200)))); mf.buildCircuitBreakers(50, 200))));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
@ -672,14 +696,13 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(clusterConfig.edsServiceName).isNull(); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(clusterConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L);
assertThat(clusterConfig.maxConcurrentRequests).isEqualTo(200L); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(clusterConfig.upstreamTlsContext).isNull();
} }
/** /**
@ -692,10 +715,13 @@ public abstract class ClientXdsClientTestBase {
// Management server sends back CDS response with UpstreamTlsContext. // Management server sends back CDS response with UpstreamTlsContext.
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", false, null, null)), Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", "round_robin", null,
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", true, false, null, null)),
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", "round_robin",
null, true,
mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)), mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)),
Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, false,
null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sent an ACK CDS request. // Client sent an ACK CDS request.
@ -703,10 +729,8 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
UpstreamTlsContext upstreamTlsContext = SdsSecretConfig validationContextSdsSecretConfig =
((EdsClusterConfig) cdsUpdate.clusterConfig).upstreamTlsContext; cdsUpdate.upstreamTlsContext().getCommonTlsContext().getValidationContextSdsSecretConfig();
SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext()
.getValidationContextSdsSecretConfig();
assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1");
assertThat( assertThat(
Iterables.getOnlyElement( Iterables.getOnlyElement(
@ -724,7 +748,7 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sends an ACK CDS request. // Client sends an ACK CDS request.
@ -735,14 +759,13 @@ public abstract class ClientXdsClientTestBase {
xdsClient.watchCdsResource(CDS_RESOURCE, watcher); xdsClient.watchCdsResource(CDS_RESOURCE, watcher);
verify(watcher).onChanged(cdsUpdateCaptor.capture()); verify(watcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(clusterConfig.edsServiceName).isNull(); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(clusterConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(clusterConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(clusterConfig.upstreamTlsContext).isNull();
call.verifyNoMoreRequest(); call.verifyNoMoreRequest();
} }
@ -763,7 +786,7 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null))); Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sends an ACK CDS request. // Client sends an ACK CDS request.
@ -771,17 +794,17 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(dnsConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(dnsConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(dnsConfig.upstreamTlsContext).isNull();
String edsService = "eds-service-bar.googleapis.com"; String edsService = "eds-service-bar.googleapis.com";
clusters = ImmutableList.of( clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, true, null, null))); Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, "round_robin", null, true, null,
null)));
call.sendResponse("1", clusters, ResourceType.CDS, "0001"); call.sendResponse("1", clusters, ResourceType.CDS, "0001");
// Client sends an ACK CDS request. // Client sends an ACK CDS request.
@ -789,14 +812,13 @@ public abstract class ClientXdsClientTestBase {
"0001"); "0001");
verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue(); cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(edsConfig.edsServiceName).isEqualTo(edsService); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(edsConfig.lrsServerName).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(edsConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(edsConfig.upstreamTlsContext).isNull();
} }
@Test @Test
@ -804,7 +826,7 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = DiscoveryRpcCall call =
startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher);
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
// Client sends an ACK CDS request. // Client sends an ACK CDS request.
@ -812,14 +834,13 @@ public abstract class ClientXdsClientTestBase {
"0000"); "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(clusterConfig.edsServiceName).isNull(); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(clusterConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(clusterConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(clusterConfig.upstreamTlsContext).isNull();
call.sendResponse("1", Collections.<Any>emptyList(), ResourceType.CDS, "0001"); call.sendResponse("1", Collections.<Any>emptyList(), ResourceType.CDS, "0001");
@ -847,38 +868,36 @@ public abstract class ClientXdsClientTestBase {
String edsService = "eds-service-bar.googleapis.com"; String edsService = "eds-service-bar.googleapis.com";
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null)), Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null)),
Any.pack(mf.buildEdsCluster(cdsResource, edsService, true, null, null))); Any.pack(mf.buildEdsCluster(cdsResource, edsService, "round_robin", null, true, null,
null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isNull();
assertThat(dnsConfig.lrsServerName).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(dnsConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(dnsConfig.upstreamTlsContext).isNull();
verify(watcher1).onChanged(cdsUpdateCaptor.capture()); verify(watcher1).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue(); cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(edsConfig.edsServiceName).isEqualTo(edsService); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(edsConfig.lrsServerName).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(edsConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(edsConfig.upstreamTlsContext).isNull();
verify(watcher2).onChanged(cdsUpdateCaptor.capture()); verify(watcher2).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue(); cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource);
assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(edsConfig.edsServiceName).isEqualTo(edsService); assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin");
assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
assertThat(edsConfig.lrsServerName).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(edsConfig.maxConcurrentRequests).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull();
assertThat(edsConfig.upstreamTlsContext).isNull();
} }
@Test @Test
@ -1094,17 +1113,18 @@ public abstract class ClientXdsClientTestBase {
xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
List<Any> clusters = ImmutableList.of( List<Any> clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null, null)),
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, false, null, null))); Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, "round_robin", null, false, null,
null)));
call.sendResponse("0", clusters, ResourceType.CDS, "0000"); call.sendResponse("0", clusters, ResourceType.CDS, "0000");
verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture());
EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(clusterConfig.edsServiceName).isEqualTo(null); assertThat(cdsUpdate.edsServiceName()).isEqualTo(null);
assertThat(clusterConfig.lrsServerName).isEqualTo(""); assertThat(cdsUpdate.lrsServerName()).isEqualTo("");
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(clusterConfig.edsServiceName).isEqualTo(EDS_RESOURCE); assertThat(cdsUpdate.edsServiceName()).isEqualTo(EDS_RESOURCE);
assertThat(clusterConfig.lrsServerName).isNull(); assertThat(cdsUpdate.lrsServerName()).isNull();
List<Any> clusterLoadAssignments = List<Any> clusterLoadAssignments =
ImmutableList.of( ImmutableList.of(
@ -1134,12 +1154,12 @@ public abstract class ClientXdsClientTestBase {
assertThat(edsUpdateCaptor.getValue().clusterName).isEqualTo(EDS_RESOURCE); assertThat(edsUpdateCaptor.getValue().clusterName).isEqualTo(EDS_RESOURCE);
clusters = ImmutableList.of( clusters = ImmutableList.of(
Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), // no change Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null,
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); null)), // no change
Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null)));
call.sendResponse("1", clusters, ResourceType.CDS, "0001"); call.sendResponse("1", clusters, ResourceType.CDS, "0001");
verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; assertThat(cdsUpdateCaptor.getValue().edsServiceName()).isNull();
assertThat(clusterConfig.edsServiceName).isNull();
verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE);
verifyNoMoreInteractions(cdsWatcher, edsWatcher); verifyNoMoreInteractions(cdsWatcher, edsWatcher);
} }
@ -1535,13 +1555,18 @@ public abstract class ClientXdsClientTestBase {
protected abstract List<? extends Message> buildOpaqueRoutes(int num); protected abstract List<? extends Message> buildOpaqueRoutes(int num);
protected abstract Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, protected abstract Message buildEdsCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs, @Nullable Message upstreamTlsContext, String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message circuitBreakers);
protected abstract Message buildLogicalDnsCluster(String clusterName, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers); @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers);
protected abstract Message buildAggregateCluster(String clusterName, List<String> clusters); protected abstract Message buildLogicalDnsCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers);
protected abstract Message buildAggregateCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, List<String> clusters);
protected abstract Message buildRingHashLbConfig(String hashFunction, long minRingSize,
long maxRingSize);
protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri); protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri);

View File

@ -27,12 +27,15 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.Durations; import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.Cluster;
import io.envoyproxy.envoy.api.v2.Cluster.CustomClusterType; import io.envoyproxy.envoy.api.v2.Cluster.CustomClusterType;
import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType;
import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy;
import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig.HashFunction;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload;
@ -384,10 +387,10 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
@Override @Override
protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs, @Nullable Message upstreamTlsContext, String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig,
initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); enableLrs, upstreamTlsContext, circuitBreakers);
builder.setType(DiscoveryType.EDS); builder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig( edsClusterConfigBuilder.setEdsConfig(
@ -400,34 +403,49 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
} }
@Override @Override
protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig,
initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); enableLrs, upstreamTlsContext, circuitBreakers);
builder.setType(DiscoveryType.LOGICAL_DNS); builder.setType(DiscoveryType.LOGICAL_DNS);
return builder.build(); return builder.build();
} }
@Override @Override
protected Message buildAggregateCluster(String clusterName, List<String> clusters) { protected Message buildAggregateCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, List<String> clusters) {
ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build();
CustomClusterType type = CustomClusterType type =
CustomClusterType.newBuilder() CustomClusterType.newBuilder()
.setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(clusterConfig)) .setTypedConfig(Any.pack(clusterConfig))
.build(); .build();
return Cluster.newBuilder() Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type);
.setName(clusterName) if (lbPolicy.equals("round_robin")) {
.setLbPolicy(LbPolicy.ROUND_ROBIN) builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
.setClusterType(type) } else if (lbPolicy.equals("ring_hash")) {
.build(); builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
return builder.build();
} }
private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.newBuilder(); Cluster.Builder builder = Cluster.newBuilder();
builder.setName(clusterName); builder.setName(clusterName);
if (lbPolicy.equals("round_robin")) {
builder.setLbPolicy(LbPolicy.ROUND_ROBIN); builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
} else if (lbPolicy.equals("ring_hash")) {
builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
if (enableLrs) { if (enableLrs) {
builder.setLrsServer( builder.setLrsServer(
ConfigSource.newBuilder() ConfigSource.newBuilder()
@ -445,6 +463,22 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
return builder; return builder;
} }
@Override
protected Message buildRingHashLbConfig(String hashFunction, long minRingSize,
long maxRingSize) {
RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder();
if (hashFunction.equals("xx_hash")) {
builder.setHashFunction(HashFunction.XX_HASH);
} else if (hashFunction.equals("murmur_hash_2")) {
builder.setHashFunction(HashFunction.MURMUR_HASH_2);
} else {
throw new AssertionError("Invalid hash function");
}
builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build());
builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build());
return builder.build();
}
@Override @Override
protected Message buildUpstreamTlsContext(String secretName, String targetUri) { protected Message buildUpstreamTlsContext(String secretName, String targetUri) {
GrpcService grpcService = GrpcService grpcService =

View File

@ -27,6 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.util.Durations; import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers;
import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
@ -35,6 +36,8 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction;
import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource;
import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; import io.envoyproxy.envoy.config.core.v3.ApiConfigSource;
@ -382,10 +385,10 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
@Override @Override
protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName,
boolean enableLrs, @Nullable Message upstreamTlsContext, String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig,
initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); enableLrs, upstreamTlsContext, circuitBreakers);
builder.setType(DiscoveryType.EDS); builder.setType(DiscoveryType.EDS);
EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder();
edsClusterConfigBuilder.setEdsConfig( edsClusterConfigBuilder.setEdsConfig(
@ -398,34 +401,49 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
} }
@Override @Override
protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig,
initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); enableLrs, upstreamTlsContext, circuitBreakers);
builder.setType(DiscoveryType.LOGICAL_DNS); builder.setType(DiscoveryType.LOGICAL_DNS);
return builder.build(); return builder.build();
} }
@Override @Override
protected Message buildAggregateCluster(String clusterName, List<String> clusters) { protected Message buildAggregateCluster(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, List<String> clusters) {
ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build();
CustomClusterType type = CustomClusterType type =
CustomClusterType.newBuilder() CustomClusterType.newBuilder()
.setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(clusterConfig)) .setTypedConfig(Any.pack(clusterConfig))
.build(); .build();
return Cluster.newBuilder() Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type);
.setName(clusterName) if (lbPolicy.equals("round_robin")) {
.setLbPolicy(LbPolicy.ROUND_ROBIN) builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
.setClusterType(type) } else if (lbPolicy.equals("ring_hash")) {
.build(); builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
return builder.build();
} }
private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy,
@Nullable Message ringHashLbConfig, boolean enableLrs,
@Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) {
Cluster.Builder builder = Cluster.newBuilder(); Cluster.Builder builder = Cluster.newBuilder();
builder.setName(clusterName); builder.setName(clusterName);
if (lbPolicy.equals("round_robin")) {
builder.setLbPolicy(LbPolicy.ROUND_ROBIN); builder.setLbPolicy(LbPolicy.ROUND_ROBIN);
} else if (lbPolicy.equals("ring_hash")) {
builder.setLbPolicy(LbPolicy.RING_HASH);
builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig);
} else {
throw new AssertionError("Invalid LB policy");
}
if (enableLrs) { if (enableLrs) {
builder.setLrsServer( builder.setLrsServer(
ConfigSource.newBuilder() ConfigSource.newBuilder()
@ -443,6 +461,22 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
return builder; return builder;
} }
@Override
protected Message buildRingHashLbConfig(String hashFunction, long minRingSize,
long maxRingSize) {
RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder();
if (hashFunction.equals("xx_hash")) {
builder.setHashFunction(HashFunction.XX_HASH);
} else if (hashFunction.equals("murmur_hash_2")) {
builder.setHashFunction(HashFunction.MURMUR_HASH_2);
} else {
throw new AssertionError("Invalid hash function");
}
builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build());
builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build());
return builder.build();
}
@Override @Override
protected Message buildUpstreamTlsContext(String secretName, String targetUri) { protected Message buildUpstreamTlsContext(String secretName, String targetUri) {
GrpcService grpcService = GrpcService grpcService =

View File

@ -56,6 +56,7 @@ import io.grpc.xds.Matchers.PathMatcher;
import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsClient.RdsResourceWatcher; import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
@ -238,9 +239,11 @@ public class XdsNameResolverTest {
private List<VirtualHost> buildUnmatchedVirtualHosts() { private List<VirtualHost> buildUnmatchedVirtualHosts() {
Route route1 = Route.create(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), Route route1 = Route.create(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null); RouteAction.forCluster(
cluster2, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)), null);
Route route2 = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), Route route2 = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null); RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)), null);
return Arrays.asList( return Arrays.asList(
VirtualHost.create("virtualhost-foo", Collections.singletonList("hello.googleapis.com"), VirtualHost.create("virtualhost-foo", Collections.singletonList("hello.googleapis.com"),
Collections.singletonList(route1), null), Collections.singletonList(route1), null),
@ -253,7 +256,8 @@ public class XdsNameResolverTest {
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, null), null); // per-route timeout unset RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), null), null); // per-route timeout unset
VirtualHost virtualHost = VirtualHost.create("does not matter", VirtualHost virtualHost = VirtualHost.create("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); Collections.singletonList(AUTHORITY), Collections.singletonList(route), null);
xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost)); xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost));
@ -268,7 +272,8 @@ public class XdsNameResolverTest {
resolver.start(mockListener); resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, null), null); // per-route timeout unset RouteAction.forCluster(
cluster1, Collections.<HashPolicy>emptyList(), null), null); // per-route timeout unset
VirtualHost virtualHost = VirtualHost.create("does not matter", VirtualHost virtualHost = VirtualHost.create("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); Collections.singletonList(AUTHORITY), Collections.singletonList(route), null);
xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L), xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L),
@ -314,10 +319,14 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), RouteAction.forCluster(
"another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(
cluster2, Collections.<HashPolicy>emptyList(), TimeUnit.SECONDS.toNanos(15L)),
null)));
verify(mockListener).onResult(resolutionResultCaptor.capture()); verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue(); ResolutionResult result = resolutionResultCaptor.getValue();
// Updated service config still contains cluster1 while it is removed resource. New calls no // Updated service config still contains cluster1 while it is removed resource. New calls no
@ -349,10 +358,13 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), RouteAction.forCluster(
"another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
// Two consecutive service config updates: one for removing clcuster1, // Two consecutive service config updates: one for removing clcuster1,
// one for adding "another=cluster". // one for adding "another=cluster".
verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture());
@ -380,10 +392,12 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), RouteAction.forCluster("another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture()); verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue(); ResolutionResult result = resolutionResultCaptor.getValue();
@ -396,10 +410,12 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(15L)), null), RouteAction.forCluster("another-cluster", Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
verifyNoMoreInteractions(mockListener); // no cluster added/deleted verifyNoMoreInteractions(mockListener); // no cluster added/deleted
assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); assertCallSelectResult(call1, configSelector, "another-cluster", 15.0);
} }
@ -414,16 +430,19 @@ public class XdsNameResolverTest {
Collections.singletonList( Collections.singletonList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
xdsClient.deliverLdsUpdate( xdsClient.deliverLdsUpdate(
AUTHORITY, AUTHORITY,
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null), RouteAction.forCluster(cluster1, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
testCall.deliverErrorStatus(); testCall.deliverErrorStatus();
verifyNoMoreInteractions(mockListener); verifyNoMoreInteractions(mockListener);
} }
@ -443,6 +462,7 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
ClusterWeight.create(cluster1, 20, null), ClusterWeight.create(cluster1, 20, null),
ClusterWeight.create(cluster2, 80, null)), ClusterWeight.create(cluster2, 80, null)),
Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(20L)), null))); TimeUnit.SECONDS.toNanos(20L)), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture()); verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue(); ResolutionResult result = resolutionResultCaptor.getValue();
@ -500,10 +520,12 @@ public class XdsNameResolverTest {
Arrays.asList( Arrays.asList(
Route.create( Route.create(
RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null), RouteAction.forCluster(cluster1, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null),
Route.create( Route.create(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); RouteAction.forCluster(cluster2, Collections.<HashPolicy>emptyList(),
TimeUnit.SECONDS.toNanos(15L)), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture()); verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue(); ResolutionResult result = resolutionResultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty(); assertThat(result.getAddresses()).isEmpty();