diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 62f2b49249..4ba0a32030 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -34,11 +34,7 @@ import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayDeque; @@ -153,17 +149,16 @@ final class CdsLoadBalancer2 extends LoadBalancer { } if (clusterState.isLeaf) { DiscoveryMechanism instance; - if (clusterState.result instanceof EdsClusterConfig) { - EdsClusterConfig clusterConfig = (EdsClusterConfig) clusterState.result; - instance = DiscoveryMechanism.forEds(clusterState.name, clusterConfig.edsServiceName, - clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, - clusterConfig.upstreamTlsContext); + if (clusterState.result.clusterType() == ClusterType.EDS) { + instance = DiscoveryMechanism.forEds( + clusterState.name, clusterState.result.edsServiceName(), + clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(), + clusterState.result.upstreamTlsContext()); } else { // logical DNS - LogicalDnsClusterConfig clusterConfig = - (LogicalDnsClusterConfig) clusterState.result; - instance = DiscoveryMechanism.forLogicalDns(clusterState.name, - clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, - clusterConfig.upstreamTlsContext); + instance = DiscoveryMechanism.forLogicalDns( + clusterState.name, clusterState.result.lrsServerName(), + clusterState.result.maxConcurrentRequests(), + clusterState.result.upstreamTlsContext()); } instances.add(instance); } else { @@ -183,7 +178,7 @@ final class CdsLoadBalancer2 extends LoadBalancer { helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); return; } - String endpointPickingPolicy = root.result.lbPolicy; + String endpointPickingPolicy = root.result.lbPolicy(); LoadBalancerProvider localityPickingLbProvider = lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded LoadBalancerProvider endpointPickingLbProvider = @@ -212,7 +207,7 @@ final class CdsLoadBalancer2 extends LoadBalancer { @Nullable private Map childClusterStates; @Nullable - private ClusterConfig result; + private CdsUpdate result; // Following fields are effectively final. private boolean isLeaf; private boolean discovered; @@ -281,15 +276,15 @@ final class CdsLoadBalancer2 extends LoadBalancer { if (shutdown) { return; } + logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); discovered = true; - result = update.clusterConfig; - if (update.clusterType == ClusterType.AGGREGATE) { + result = update; + if (update.clusterType() == ClusterType.AGGREGATE) { isLeaf = false; - AggregateClusterConfig clusterConfig = (AggregateClusterConfig) update.clusterConfig; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}", update.clusterName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); + logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", + update.clusterName(), update.prioritizedClusterNames()); Map newChildStates = new LinkedHashMap<>(); - for (String cluster : clusterConfig.prioritizedClusterNames) { + for (String cluster : update.prioritizedClusterNames()) { if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { ClusterState childState = new ClusterState(cluster); childState.start(); @@ -304,18 +299,13 @@ final class CdsLoadBalancer2 extends LoadBalancer { } } childClusterStates = newChildStates; - } else if (update.clusterType == ClusterType.EDS) { + } else if (update.clusterType() == ClusterType.EDS) { isLeaf = true; - EdsClusterConfig clusterConfig = (EdsClusterConfig) update.clusterConfig; logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName, clusterConfig.edsServiceName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); + update.clusterName(), update.edsServiceName()); } else { // logical DNS isLeaf = true; - LogicalDnsClusterConfig clusterConfig = - (LogicalDnsClusterConfig) update.clusterConfig; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); + logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } handleClusterDiscovered(); } diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 324c49d532..5ba10c329e 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CaseFormat; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @@ -34,6 +35,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -65,11 +67,9 @@ import io.grpc.xds.Matchers.PathMatcher; import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; +import io.grpc.xds.XdsClient.CdsUpdate.HashFunction; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -96,6 +96,8 @@ final class ClientXdsClient extends AbstractXdsClient { @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault"; + @VisibleForTesting + static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id"; private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2" + ".HttpConnectionManager"; @@ -468,10 +470,42 @@ final class ClientXdsClient extends AbstractXdsClient { timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration()); } } - List weightedClusters; + List hashPolicies = new ArrayList<>(); + for (io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy config + : proto.getHashPolicyList()) { + HashPolicy policy = null; + boolean terminal = config.getTerminal(); + switch (config.getPolicySpecifierCase()) { + case HEADER: + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header headerCfg = + config.getHeader(); + Pattern regEx = null; + String regExSubstitute = null; + if (headerCfg.hasRegexRewrite() && headerCfg.getRegexRewrite().hasPattern() + && headerCfg.getRegexRewrite().getPattern().hasGoogleRe2()) { + regEx = Pattern.compile(headerCfg.getRegexRewrite().getPattern().getRegex()); + regExSubstitute = headerCfg.getRegexRewrite().getSubstitution(); + } + policy = HashPolicy.forHeader( + terminal, headerCfg.getHeaderName(), regEx, regExSubstitute); + break; + case FILTER_STATE: + if (config.getFilterState().getKey().equals(HASH_POLICY_FILTER_STATE_KEY)) { + policy = HashPolicy.forChannelId(terminal); + } + break; + default: + // Ignore + } + if (policy != null) { + hashPolicies.add(policy); + } + } + switch (proto.getClusterSpecifierCase()) { case CLUSTER: - return StructOrError.fromStruct(RouteAction.forCluster(proto.getCluster(), timeoutNano)); + return StructOrError.fromStruct(RouteAction.forCluster( + proto.getCluster(), hashPolicies, timeoutNano)); case CLUSTER_HEADER: return null; case WEIGHTED_CLUSTERS: @@ -480,7 +514,7 @@ final class ClientXdsClient extends AbstractXdsClient { if (clusterWeights.isEmpty()) { return StructOrError.fromError("No cluster found in weighted cluster list"); } - weightedClusters = new ArrayList<>(); + List weightedClusters = new ArrayList<>(); for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight : clusterWeights) { StructOrError clusterWeightOrError = parseClusterWeight(clusterWeight); @@ -492,7 +526,7 @@ final class ClientXdsClient extends AbstractXdsClient { } // TODO(chengyuanzhang): validate if the sum of weights equals to total weight. return StructOrError.fromStruct(RouteAction.forWeightedClusters( - weightedClusters, timeoutNano)); + weightedClusters, hashPolicies, timeoutNano)); case CLUSTERSPECIFIER_NOT_SET: default: return StructOrError.fromError( @@ -713,30 +747,44 @@ final class ClientXdsClient extends AbstractXdsClient { if (!cdsResourceSubscribers.containsKey(clusterName)) { continue; } - // The lb_policy field must be set to ROUND_ROBIN. - if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": unsupported Lb policy: " + cluster.getLbPolicy()); - return; - } - String lbPolicy = "round_robin"; - CdsUpdate update = null; + StructOrError structOrError; switch (cluster.getClusterDiscoveryTypeCase()) { case TYPE: - update = parseNonAggregateCluster(cluster, nonce, lbPolicy, edsResources); + structOrError = parseNonAggregateCluster(cluster, edsResources); break; case CLUSTER_TYPE: - update = parseAggregateCluster(cluster, nonce, lbPolicy); + structOrError = parseAggregateCluster(cluster); break; case CLUSTERDISCOVERYTYPE_NOT_SET: default: nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": cluster discovery type unspecified"); + return; } - if (update == null) { + if (structOrError.getErrorDetail() != null) { + nackResponse(ResourceType.CDS, nonce, structOrError.errorDetail); return; } - cdsUpdates.put(clusterName, update); + CdsUpdate.Builder updateBuilder = structOrError.getStruct(); + String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( + CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); + if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { + HashFunction hashFunction; + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { + hashFunction = HashFunction.XX_HASH; + } else { + nackResponse(ResourceType.CDS, nonce, + "Cluster " + clusterName + ": unsupported ring hash function: " + + lbConfig.getHashFunction()); + return; + } + updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(), + lbConfig.getMaximumRingSize().getValue(), hashFunction); + } else { + updateBuilder.lbPolicy(lbPolicy); + } + cdsUpdates.put(clusterName, updateBuilder.build()); } ackResponse(ResourceType.CDS, versionInfo, nonce); @@ -756,18 +804,13 @@ final class ClientXdsClient extends AbstractXdsClient { } } - /** - * Parses CDS resource for an aggregate cluster into {@link io.grpc.xds.XdsClient.CdsUpdate}. - * Returns {@code null} and nack the response with the given nonce if the resource is invalid. - */ - private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lbPolicy) { + private static StructOrError parseAggregateCluster(Cluster cluster) { String clusterName = cluster.getName(); CustomClusterType customType = cluster.getClusterType(); String typeName = customType.getName(); if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) { - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": unsupported custom cluster type: " + typeName); - return null; } io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig clusterConfig; Any unpackedClusterConfig = customType.getTypedConfig(); @@ -779,31 +822,23 @@ final class ClientXdsClient extends AbstractXdsClient { clusterConfig = unpackedClusterConfig.unpack( io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class); } catch (InvalidProtocolBufferException e) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": invalid cluster config: " + e); + StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e); return null; } - AggregateClusterConfig config = - new AggregateClusterConfig(lbPolicy, clusterConfig.getClustersList()); - return new CdsUpdate(clusterName, ClusterType.AGGREGATE, config); + return StructOrError.fromStruct(CdsUpdate.forAggregate( + clusterName, clusterConfig.getClustersList())); } - /** - * Parses CDS resource for a non-aggregate cluster (EDS or Logical DNS) into {@link - * io.grpc.xds.XdsClient.CdsUpdate}. Returns {@code null} and nack the response with the given - * nonce if the resource is invalid. - */ - private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String lbPolicy, - Set edsResources) { + private static StructOrError parseNonAggregateCluster( + Cluster cluster, Set edsResources) { String clusterName = cluster.getName(); String lrsServerName = null; Long maxConcurrentRequests = null; UpstreamTlsContext upstreamTlsContext = null; if (cluster.hasLrsServer()) { if (!cluster.getLrsServer().hasSelf()) { - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": only support LRS for the same management server"); - return null; } lrsServerName = ""; } @@ -829,9 +864,8 @@ final class ClientXdsClient extends AbstractXdsClient { unpacked = any.unpack( io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class); } catch (InvalidProtocolBufferException e) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": invalid upstream TLS context: " + e); - return null; + return StructOrError.fromError( + "Cluster " + clusterName + ": malformed UpstreamTlsContext: " + e); } upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(unpacked); } @@ -842,9 +876,8 @@ final class ClientXdsClient extends AbstractXdsClient { io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig(); if (!edsClusterConfig.getEdsConfig().hasAds()) { - nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": " - + "field eds_cluster_config must be set to indicate to use EDS over ADS."); - return null; + return StructOrError.fromError("Cluster " + clusterName + + ": field eds_cluster_config must be set to indicate to use EDS over ADS."); } // If the service_name field is set, that value will be used for the EDS request. if (!edsClusterConfig.getServiceName().isEmpty()) { @@ -853,17 +886,14 @@ final class ClientXdsClient extends AbstractXdsClient { } else { edsResources.add(clusterName); } - EdsClusterConfig config = new EdsClusterConfig(lbPolicy, edsServiceName, - lrsServerName, maxConcurrentRequests, upstreamTlsContext); - return new CdsUpdate(clusterName, ClusterType.EDS, config); + return StructOrError.fromStruct(CdsUpdate.forEds( + clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext)); } else if (type.equals(DiscoveryType.LOGICAL_DNS)) { - LogicalDnsClusterConfig config = new LogicalDnsClusterConfig(lbPolicy, lrsServerName, - maxConcurrentRequests, upstreamTlsContext); - return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, config); + return StructOrError.fromStruct(CdsUpdate.forLogicalDns( + clusterName, lrsServerName, maxConcurrentRequests, upstreamTlsContext)); } - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); - return null; } @Override diff --git a/xds/src/main/java/io/grpc/xds/VirtualHost.java b/xds/src/main/java/io/grpc/xds/VirtualHost.java index f80106b2d3..30e08d5e23 100644 --- a/xds/src/main/java/io/grpc/xds/VirtualHost.java +++ b/xds/src/main/java/io/grpc/xds/VirtualHost.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.re2j.Pattern; import io.grpc.xds.Matchers.FractionMatcher; import io.grpc.xds.Matchers.HeaderMatcher; import io.grpc.xds.Matchers.PathMatcher; @@ -89,6 +90,9 @@ abstract class VirtualHost { @AutoValue abstract static class RouteAction { + // List of hash policies to use for ring hash load balancing. + abstract ImmutableList hashPolicies(); + @Nullable abstract Long timeoutNano(); @@ -98,21 +102,23 @@ abstract class VirtualHost { @Nullable abstract ImmutableList weightedClusters(); - static RouteAction forCluster(String cluster, @Nullable Long timeoutNano) { + static RouteAction forCluster(String cluster, List hashPolicies, + @Nullable Long timeoutNano) { checkNotNull(cluster, "cluster"); - return RouteAction.create(timeoutNano, cluster, null); + return RouteAction.create(hashPolicies, timeoutNano, cluster, null); } static RouteAction forWeightedClusters(List weightedClusters, - @Nullable Long timeoutNano) { + List hashPolicies, @Nullable Long timeoutNano) { checkNotNull(weightedClusters, "weightedClusters"); checkArgument(!weightedClusters.isEmpty(), "empty cluster list"); - return RouteAction.create(timeoutNano, null, weightedClusters); + return RouteAction.create(hashPolicies, timeoutNano, null, weightedClusters); } - private static RouteAction create(@Nullable Long timeoutNano, @Nullable String cluster, - @Nullable List weightedClusters) { - return new AutoValue_VirtualHost_Route_RouteAction(timeoutNano, cluster, + private static RouteAction create(List hashPolicies, @Nullable Long timeoutNano, + @Nullable String cluster, @Nullable List weightedClusters) { + return new AutoValue_VirtualHost_Route_RouteAction( + ImmutableList.copyOf(hashPolicies), timeoutNano, cluster, weightedClusters == null ? null : ImmutableList.copyOf(weightedClusters)); } @@ -130,6 +136,52 @@ abstract class VirtualHost { httpFault); } } + + // Configuration for the route's hashing policy if the upstream cluster uses a hashing load + // balancer. + @AutoValue + abstract static class HashPolicy { + // The specifier that indicates the component of the request to be hashed on. + abstract Type type(); + + // The flag that short-circuits the hash computing. + abstract boolean isTerminal(); + + // The name of the request header that will be used to obtain the hash key. + // Only valid if type is HEADER. + @Nullable + abstract String headerName(); + + // The regular expression used to find portions to be replaced in the header value. + // Only valid if type is HEADER. + @Nullable + abstract Pattern regEx(); + + // The string that should be substituted into matching portions of the header value. + // Only valid if type is HEADER. + @Nullable + abstract String regExSubstitution(); + + static HashPolicy forHeader(boolean isTerminal, String headerName, + @Nullable Pattern regEx, @Nullable String regExSubstitution) { + checkNotNull(headerName, "headerName"); + return HashPolicy.create(Type.HEADER, isTerminal, headerName, regEx, regExSubstitution); + } + + static HashPolicy forChannelId(boolean isTerminal) { + return HashPolicy.create(Type.CHANNEL_ID, isTerminal, null, null, null); + } + + private static HashPolicy create(Type type, boolean isTerminal, @Nullable String headerName, + @Nullable Pattern regEx, @Nullable String regExSubstitution) { + return new AutoValue_VirtualHost_Route_RouteAction_HashPolicy(type, isTerminal, + headerName, regEx, regExSubstitution); + } + + enum Type { + HEADER, CHANNEL_ID + } + } } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 4360468db7..4018770dec 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -19,8 +19,10 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; +import com.google.common.collect.ImmutableList; import io.grpc.Status; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -155,197 +157,155 @@ abstract class XdsClient { } } - static final class CdsUpdate implements ResourceUpdate { - final String clusterName; - final ClusterType clusterType; - final ClusterConfig clusterConfig; + /** xDS resource update for cluster-level configuration. */ + @AutoValue + abstract static class CdsUpdate implements ResourceUpdate { + abstract String clusterName(); - CdsUpdate(String clusterName, ClusterType clusterType, ClusterConfig clusterConfig) { - this.clusterName = checkNotNull(clusterName, "clusterName"); - this.clusterType = checkNotNull(clusterType, "clusterType"); - this.clusterConfig = checkNotNull(clusterConfig, "clusterConfig"); + abstract ClusterType clusterType(); + + // Endpoint-level load balancing policy. + abstract String lbPolicy(); + + // Only valid if lbPolicy is "ring_hash". + abstract long minRingSize(); + + // Only valid if lbPolicy is "ring_hash". + abstract long maxRingSize(); + + // Only valid if lbPolicy is "ring_hash". + @Nullable + abstract HashFunction hashFunction(); + + // Alternative resource name to be used in EDS requests. + /// Only valid for EDS cluster. + @Nullable + abstract String edsServiceName(); + + // Load report server name for reporting loads via LRS. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + abstract String lrsServerName(); + + // Max number of concurrent requests can be sent to this cluster. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + abstract Long maxConcurrentRequests(); + + // TLS context used to connect to connect to this cluster. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + abstract UpstreamTlsContext upstreamTlsContext(); + + // List of underlying clusters making of this aggregate cluster. + // Only valid for AGGREGATE cluster. + @Nullable + abstract ImmutableList prioritizedClusterNames(); + + static Builder forAggregate(String clusterName, List prioritizedClusterNames) { + checkNotNull(prioritizedClusterNames, "prioritizedClusterNames"); + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.AGGREGATE) + .minRingSize(0) + .maxRingSize(0) + .prioritizedClusterNames(ImmutableList.copyOf(prioritizedClusterNames)); } - @Override - public int hashCode() { - return Objects.hash(clusterName, clusterType, clusterConfig); + static Builder forEds(String clusterName, @Nullable String edsServiceName, + @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, + @Nullable UpstreamTlsContext upstreamTlsContext) { + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.EDS) + .minRingSize(0) + .maxRingSize(0) + .edsServiceName(edsServiceName) + .lrsServerName(lrsServerName) + .maxConcurrentRequests(maxConcurrentRequests) + .upstreamTlsContext(upstreamTlsContext); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CdsUpdate that = (CdsUpdate) o; - return Objects.equals(clusterName, that.clusterName) - && Objects.equals(clusterType, that.clusterType) - && Objects.equals(clusterConfig, that.clusterConfig); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("clusterName", clusterName) - .add("clusterType", clusterType) - .add("clusterConfig", clusterConfig) - .toString(); + static Builder forLogicalDns(String clusterName, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.LOGICAL_DNS) + .minRingSize(0) + .maxRingSize(0) + .lrsServerName(lrsServerName) + .maxConcurrentRequests(maxConcurrentRequests) + .upstreamTlsContext(upstreamTlsContext); } enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE } - abstract static class ClusterConfig { - // Endpoint level load balancing policy. - final String lbPolicy; - - private ClusterConfig(String lbPolicy) { - this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy"); - } + enum HashFunction { + XX_HASH } - static final class AggregateClusterConfig extends ClusterConfig { - // List of underlying clusters making of this aggregate cluster. - final List prioritizedClusterNames; - - AggregateClusterConfig(String lbPolicy, List prioritizedClusterNames) { - super(lbPolicy); - this.prioritizedClusterNames = - Collections.unmodifiableList(new ArrayList<>(prioritizedClusterNames)); - } - - @Override - public int hashCode() { - return Objects.hash(lbPolicy, prioritizedClusterNames); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AggregateClusterConfig that = (AggregateClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(prioritizedClusterNames, that.prioritizedClusterNames); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("prioritizedClusterNames", prioritizedClusterNames) - .toString(); - } + // FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed. + @Override + public final String toString() { + return MoreObjects.toStringHelper(this) + .add("clusterName", clusterName()) + .add("clusterType", clusterType()) + .add("lbPolicy", lbPolicy()) + .add("minRingSize", minRingSize()) + .add("maxRingSize", maxRingSize()) + .add("hashFunction", hashFunction()) + .add("edsServiceName", edsServiceName()) + .add("lrsServerName", lrsServerName()) + .add("maxConcurrentRequests", maxConcurrentRequests()) + // Exclude upstreamTlsContext as its string representation is cumbersome. + .add("prioritizedClusterNames", prioritizedClusterNames()) + .toString(); } - private abstract static class NonAggregateClusterConfig extends ClusterConfig { - // Load report server name for reporting loads via LRS. - @Nullable - final String lrsServerName; - // Max number of concurrent requests can be sent to this cluster. - // FIXME(chengyuanzhang): protobuf uint32 is int in Java, so this field can be Integer. - @Nullable - final Long maxConcurrentRequests; - // TLS context used to connect to connect to this cluster. - @Nullable - final UpstreamTlsContext upstreamTlsContext; + @AutoValue.Builder + abstract static class Builder { + // Private do not use. + protected abstract Builder clusterName(String clusterName); - private NonAggregateClusterConfig(String lbPolicy, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy); - this.lrsServerName = lrsServerName; - this.maxConcurrentRequests = maxConcurrentRequests; - this.upstreamTlsContext = upstreamTlsContext; - } - } + // Private do not use. + protected abstract Builder clusterType(ClusterType clusterType); - static final class EdsClusterConfig extends NonAggregateClusterConfig { - // Alternative resource name to be used in EDS requests. - @Nullable - final String edsServiceName; + // Private do not use. + protected abstract Builder lbPolicy(String lbPolicy); - EdsClusterConfig(String lbPolicy, @Nullable String edsServiceName, - @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - this.edsServiceName = edsServiceName; + Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize, + HashFunction hashFunction) { + return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize) + .hashFunction(checkNotNull(hashFunction, "hashFunction")); } - @Override - public int hashCode() { - return Objects.hash(lbPolicy, edsServiceName, lrsServerName, maxConcurrentRequests, - upstreamTlsContext); - } + // Private do not use. + protected abstract Builder minRingSize(long minRingSize); - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - EdsClusterConfig that = (EdsClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(edsServiceName, that.edsServiceName) - && Objects.equals(lrsServerName, that.lrsServerName) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); - } + // Private do not use. + protected abstract Builder maxRingSize(long maxRingSize); - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("edsServiceName", edsServiceName) - .add("lrsServerName", lrsServerName) - .add("maxConcurrentRequests", maxConcurrentRequests) - // Exclude upstreamTlsContext as its string representation is cumbersome. - .toString(); - } - } + // Private do not use. + protected abstract Builder hashFunction(HashFunction hashFunction); - static final class LogicalDnsClusterConfig extends NonAggregateClusterConfig { - LogicalDnsClusterConfig(String lbPolicy, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - } + // Private do not use. + protected abstract Builder edsServiceName(String edsServiceName); - @Override - public int hashCode() { - return Objects.hash(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - } + // Private do not use. + protected abstract Builder lrsServerName(String lrsServerName); - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LogicalDnsClusterConfig that = (LogicalDnsClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(lrsServerName, that.lrsServerName) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); - } + // Private do not use. + protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests); - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("lrsServerName", lrsServerName) - .add("maxConcurrentRequests", maxConcurrentRequests) - // Exclude upstreamTlsContext as its string representation is cumbersome. - .toString(); - } + // Private do not use. + protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext); + + // Private do not use. + protected abstract Builder prioritizedClusterNames(List prioritizedClusterNames); + + abstract CdsUpdate build(); } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 8957a4e5df..b95117acbf 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -47,10 +47,6 @@ import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import java.util.ArrayList; import java.util.Arrays; @@ -548,9 +544,9 @@ public class CdsLoadBalancer2Test { @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - EdsClusterConfig clusterConfig = new EdsClusterConfig("round_robin", edsServiceName, - lrsServerName, maxConcurrentRequests, tlsContext); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.EDS, clusterConfig); + CdsUpdate update = CdsUpdate.forEds( + clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } @@ -558,17 +554,17 @@ public class CdsLoadBalancer2Test { private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - LogicalDnsClusterConfig clusterConfig = new LogicalDnsClusterConfig("round_robin", - lrsServerName, maxConcurrentRequests, tlsContext); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, clusterConfig); + CdsUpdate update = CdsUpdate.forLogicalDns( + clusterName, lrsServerName, maxConcurrentRequests, tlsContext) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } private void deliverAggregateCluster(String clusterName, List clusters) { if (watchers.containsKey(clusterName)) { - AggregateClusterConfig clusterConfig = new AggregateClusterConfig("round_robin", clusters); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.AGGREGATE, clusterConfig); + CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 27daa2d635..988a00350e 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -29,10 +29,16 @@ import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; import io.envoyproxy.envoy.config.route.v3.DirectResponseAction; import io.envoyproxy.envoy.config.route.v3.FilterAction; import io.envoyproxy.envoy.config.route.v3.RedirectAction; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.ConnectionProperties; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.FilterState; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.QueryParameter; import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration; import io.envoyproxy.envoy.config.route.v3.WeightedCluster; import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort; +import io.envoyproxy.envoy.type.matcher.v3.RegexMatchAndSubstitute; import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher; +import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher.GoogleRE2; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.envoyproxy.envoy.type.v3.Int64Range; @@ -47,9 +53,11 @@ import io.grpc.xds.Matchers.PathMatcher; import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -77,7 +85,8 @@ public class ClientXdsClientDataTest { Route.create( RouteMatch.create(PathMatcher.fromPath("/service/method", false), Collections.emptyList(), null), - RouteAction.forCluster("cluster-foo", null), null)); + RouteAction.forCluster("cluster-foo", Collections.emptyList(), null), + null)); } @Test @@ -390,6 +399,50 @@ public class ClientXdsClientDataTest { assertThat(struct.getStruct().timeoutNano()).isNull(); } + @Test + public void parseRouteAction_withHashPolicies() { + io.envoyproxy.envoy.config.route.v3.RouteAction proto = + io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() + .setCluster("cluster-foo") + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setHeader( + Header.newBuilder() + .setHeaderName("user-agent") + .setRegexRewrite( + RegexMatchAndSubstitute.newBuilder() + .setPattern( + RegexMatcher.newBuilder() + .setGoogleRe2(GoogleRE2.getDefaultInstance()) + .setRegex("grpc.*")) + .setSubstitution("gRPC")))) + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setConnectionProperties(ConnectionProperties.newBuilder().setSourceIp(true)) + .setTerminal(true)) // unsupported + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setFilterState( + FilterState.newBuilder() + .setKey(ClientXdsClient.HASH_POLICY_FILTER_STATE_KEY))) + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setQueryParameter( + QueryParameter.newBuilder().setName("param"))) // unsupported + .build(); + StructOrError struct = ClientXdsClient.parseRouteAction(proto); + List policies = struct.getStruct().hashPolicies(); + assertThat(policies).hasSize(2); + assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER); + assertThat(policies.get(0).headerName()).isEqualTo("user-agent"); + assertThat(policies.get(0).isTerminal()).isFalse(); + assertThat(policies.get(0).regEx().pattern()).isEqualTo("grpc.*"); + assertThat(policies.get(0).regExSubstitution()).isEqualTo("gRPC"); + + assertThat(policies.get(1).type()).isEqualTo(HashPolicy.Type.CHANNEL_ID); + assertThat(policies.get(1).isTerminal()).isFalse(); + } + @Test public void parseClusterWeight() { io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto = diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index edc24b8180..5c88d366e5 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -47,14 +47,11 @@ import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LbEndpoint; import io.grpc.xds.Endpoints.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; +import io.grpc.xds.XdsClient.CdsUpdate.HashFunction; import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.LdsResourceWatcher; @@ -597,8 +594,10 @@ public abstract class ClientXdsClientTestBase { startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, false, null, null)), - Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); + Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, "round_robin", null, + false, null, null)), + Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, + false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -616,7 +615,7 @@ public abstract class ClientXdsClientTestBase { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -624,14 +623,41 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void cdsResourceFound_ringHashLbPolicy() { + DiscoveryRpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + Message ringHashConfig = mf.buildRingHashLbConfig("xx_hash", 10L, 100L); + List clusters = ImmutableList.of( + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "ring_hash", ringHashConfig, false, null, + null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); + + // Client sent an ACK CDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash"); + assertThat(cdsUpdate.hashFunction()).isEqualTo(HashFunction.XX_HASH); + assertThat(cdsUpdate.minRingSize()).isEqualTo(10L); + assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -642,7 +668,7 @@ public abstract class ClientXdsClientTestBase { List candidates = Arrays.asList( "cluster1.googleapis.com", "cluster2.googleapis.com", "cluster3.googleapis.com"); List clusters = ImmutableList.of( - Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, candidates))); + Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, candidates))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -650,12 +676,10 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.AGGREGATE); - AggregateClusterConfig clusterConfig = (AggregateClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.prioritizedClusterNames) - .containsExactlyElementsIn(candidates).inOrder(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); } @Test @@ -663,7 +687,7 @@ public abstract class ClientXdsClientTestBase { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, mf.buildCircuitBreakers(50, 200)))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); @@ -672,14 +696,13 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isEqualTo(200L); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } /** @@ -692,10 +715,13 @@ public abstract class ClientXdsClientTestBase { // Management server sends back CDS response with UpstreamTlsContext. List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", false, null, null)), - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", true, + Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", "round_robin", null, + false, null, null)), + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", "round_robin", + null, true, mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)), - Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); + Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, false, + null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -703,10 +729,8 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - UpstreamTlsContext upstreamTlsContext = - ((EdsClusterConfig) cdsUpdate.clusterConfig).upstreamTlsContext; - SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() - .getValidationContextSdsSecretConfig(); + SdsSecretConfig validationContextSdsSecretConfig = + cdsUpdate.upstreamTlsContext().getCommonTlsContext().getValidationContextSdsSecretConfig(); assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); assertThat( Iterables.getOnlyElement( @@ -724,7 +748,7 @@ public abstract class ClientXdsClientTestBase { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -735,14 +759,13 @@ public abstract class ClientXdsClientTestBase { xdsClient.watchCdsResource(CDS_RESOURCE, watcher); verify(watcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); call.verifyNoMoreRequest(); } @@ -763,7 +786,7 @@ public abstract class ClientXdsClientTestBase { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null))); + Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -771,17 +794,17 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; - assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(dnsConfig.lrsServerName).isNull(); - assertThat(dnsConfig.maxConcurrentRequests).isNull(); - assertThat(dnsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); String edsService = "eds-service-bar.googleapis.com"; clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, true, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, "round_robin", null, true, null, + null))); call.sendResponse("1", clusters, ResourceType.CDS, "0001"); // Client sends an ACK CDS request. @@ -789,14 +812,13 @@ public abstract class ClientXdsClientTestBase { "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } @Test @@ -804,7 +826,7 @@ public abstract class ClientXdsClientTestBase { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -812,14 +834,13 @@ public abstract class ClientXdsClientTestBase { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); call.sendResponse("1", Collections.emptyList(), ResourceType.CDS, "0001"); @@ -847,38 +868,36 @@ public abstract class ClientXdsClientTestBase { String edsService = "eds-service-bar.googleapis.com"; List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null)), - Any.pack(mf.buildEdsCluster(cdsResource, edsService, true, null, null))); + Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null)), + Any.pack(mf.buildEdsCluster(cdsResource, edsService, "round_robin", null, true, null, + null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; - assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(dnsConfig.lrsServerName).isNull(); - assertThat(dnsConfig.maxConcurrentRequests).isNull(); - assertThat(dnsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); verify(watcher1).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); verify(watcher2).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } @Test @@ -1094,17 +1113,18 @@ public abstract class ClientXdsClientTestBase { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, false, null, null))); + Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null, null)), + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, "round_robin", null, false, null, + null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture()); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isEqualTo(null); - assertThat(clusterConfig.lrsServerName).isEqualTo(""); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(null); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); - clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isEqualTo(EDS_RESOURCE); - assertThat(clusterConfig.lrsServerName).isNull(); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(EDS_RESOURCE); + assertThat(cdsUpdate.lrsServerName()).isNull(); List clusterLoadAssignments = ImmutableList.of( @@ -1134,12 +1154,12 @@ public abstract class ClientXdsClientTestBase { assertThat(edsUpdateCaptor.getValue().clusterName).isEqualTo(EDS_RESOURCE); clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), // no change - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null, + null)), // no change + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("1", clusters, ResourceType.CDS, "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); - clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); + assertThat(cdsUpdateCaptor.getValue().edsServiceName()).isNull(); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); verifyNoMoreInteractions(cdsWatcher, edsWatcher); } @@ -1535,13 +1555,18 @@ public abstract class ClientXdsClientTestBase { protected abstract List buildOpaqueRoutes(int num); protected abstract Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers); - - protected abstract Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers); - protected abstract Message buildAggregateCluster(String clusterName, List clusters); + protected abstract Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers); + + protected abstract Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters); + + protected abstract Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize); protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri); diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java index 341bac7a72..424af04ccc 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -27,12 +27,15 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.Cluster.CustomClusterType; import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig.HashFunction; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload; @@ -384,10 +387,10 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase { @Override protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.EDS); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); edsClusterConfigBuilder.setEdsConfig( @@ -400,34 +403,49 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase { } @Override - protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.LOGICAL_DNS); return builder.build(); } @Override - protected Message buildAggregateCluster(String clusterName, List clusters) { + protected Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters) { ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); CustomClusterType type = CustomClusterType.newBuilder() .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setTypedConfig(Any.pack(clusterConfig)) .build(); - return Cluster.newBuilder() - .setName(clusterName) - .setLbPolicy(LbPolicy.ROUND_ROBIN) - .setClusterType(type) - .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } + return builder.build(); } - private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, + private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { Cluster.Builder builder = Cluster.newBuilder(); builder.setName(clusterName); - builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } if (enableLrs) { builder.setLrsServer( ConfigSource.newBuilder() @@ -445,6 +463,22 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase { return builder; } + @Override + protected Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize) { + RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder(); + if (hashFunction.equals("xx_hash")) { + builder.setHashFunction(HashFunction.XX_HASH); + } else if (hashFunction.equals("murmur_hash_2")) { + builder.setHashFunction(HashFunction.MURMUR_HASH_2); + } else { + throw new AssertionError("Invalid hash function"); + } + builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build()); + builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build()); + return builder.build(); + } + @Override protected Message buildUpstreamTlsContext(String secretName, String targetUri) { GrpcService grpcService = diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java index dca11b3984..66cf8be5fb 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; @@ -35,6 +36,8 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; @@ -382,10 +385,10 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase { @Override protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.EDS); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); edsClusterConfigBuilder.setEdsConfig( @@ -398,34 +401,49 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase { } @Override - protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.LOGICAL_DNS); return builder.build(); } @Override - protected Message buildAggregateCluster(String clusterName, List clusters) { + protected Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters) { ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); CustomClusterType type = CustomClusterType.newBuilder() .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setTypedConfig(Any.pack(clusterConfig)) .build(); - return Cluster.newBuilder() - .setName(clusterName) - .setLbPolicy(LbPolicy.ROUND_ROBIN) - .setClusterType(type) - .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } + return builder.build(); } - private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, + private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { Cluster.Builder builder = Cluster.newBuilder(); builder.setName(clusterName); - builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } if (enableLrs) { builder.setLrsServer( ConfigSource.newBuilder() @@ -443,6 +461,22 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase { return builder; } + @Override + protected Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize) { + RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder(); + if (hashFunction.equals("xx_hash")) { + builder.setHashFunction(HashFunction.XX_HASH); + } else if (hashFunction.equals("murmur_hash_2")) { + builder.setHashFunction(HashFunction.MURMUR_HASH_2); + } else { + throw new AssertionError("Invalid hash function"); + } + builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build()); + builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build()); + return builder.build(); + } + @Override protected Message buildUpstreamTlsContext(String secretName, String targetUri) { GrpcService grpcService = diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 7eeea86e9a..8b7da8fee6 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -56,6 +56,7 @@ import io.grpc.xds.Matchers.PathMatcher; import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.XdsClient.RdsResourceWatcher; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; @@ -238,9 +239,11 @@ public class XdsNameResolverTest { private List buildUnmatchedVirtualHosts() { Route route1 = Route.create(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null); + RouteAction.forCluster( + cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), null); Route route2 = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null); + RouteAction.forCluster( + cluster1, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), null); return Arrays.asList( VirtualHost.create("virtualhost-foo", Collections.singletonList("hello.googleapis.com"), Collections.singletonList(route1), null), @@ -253,7 +256,8 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster(cluster1, null), null); // per-route timeout unset + RouteAction.forCluster( + cluster1, Collections.emptyList(), null), null); // per-route timeout unset VirtualHost virtualHost = VirtualHost.create("does not matter", Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost)); @@ -268,7 +272,8 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = Route.create(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster(cluster1, null), null); // per-route timeout unset + RouteAction.forCluster( + cluster1, Collections.emptyList(), null), null); // per-route timeout unset VirtualHost virtualHost = VirtualHost.create("does not matter", Collections.singletonList(AUTHORITY), Collections.singletonList(route), null); xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L), @@ -314,10 +319,14 @@ public class XdsNameResolverTest { Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), + RouteAction.forCluster( + "another-cluster", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(20L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster( + cluster2, Collections.emptyList(), TimeUnit.SECONDS.toNanos(15L)), + null))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); // Updated service config still contains cluster1 while it is removed resource. New calls no @@ -349,10 +358,13 @@ public class XdsNameResolverTest { Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), + RouteAction.forCluster( + "another-cluster", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(20L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); // Two consecutive service config updates: one for removing clcuster1, // one for adding "another=cluster". verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); @@ -380,10 +392,12 @@ public class XdsNameResolverTest { Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(20L)), null), + RouteAction.forCluster("another-cluster", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(20L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -396,10 +410,12 @@ public class XdsNameResolverTest { Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster("another-cluster", TimeUnit.SECONDS.toNanos(15L)), null), + RouteAction.forCluster("another-cluster", Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); verifyNoMoreInteractions(mockListener); // no cluster added/deleted assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); } @@ -414,16 +430,19 @@ public class XdsNameResolverTest { Collections.singletonList( Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); xdsClient.deliverLdsUpdate( AUTHORITY, Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null), + RouteAction.forCluster(cluster1, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); testCall.deliverErrorStatus(); verifyNoMoreInteractions(mockListener); } @@ -443,6 +462,7 @@ public class XdsNameResolverTest { Arrays.asList( ClusterWeight.create(cluster1, 20, null), ClusterWeight.create(cluster2, 80, null)), + Collections.emptyList(), TimeUnit.SECONDS.toNanos(20L)), null))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -500,10 +520,12 @@ public class XdsNameResolverTest { Arrays.asList( Route.create( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - RouteAction.forCluster(cluster1, TimeUnit.SECONDS.toNanos(15L)), null), + RouteAction.forCluster(cluster1, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null), Route.create( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - RouteAction.forCluster(cluster2, TimeUnit.SECONDS.toNanos(15L)), null))); + RouteAction.forCluster(cluster2, Collections.emptyList(), + TimeUnit.SECONDS.toNanos(15L)), null))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); assertThat(result.getAddresses()).isEmpty();