mirror of https://github.com/grpc/grpc-java.git
xds: Use wrr_locality LB and support load_balancing_policy in Cluster (#9141)
Instead of providing round robin or least request configurations directly, ClientXdsClient now wraps them in a WRR locality config. ClusterResolverLoadBalancer passes this configuration directly to PriorityLoadBalancer to use as the endpoint LB policy it provides to ClusterImplLoadBalancer. A new ResolvedAddresses attribute is also set that has all the locality weights. This is needed by WrrLocalityLoadBalancer when it configures WeightedTargetLoadBalancer. Renames the LegacyLoadBalancerConfigFactory to just LoadBalancerConfigFactory and gives it responsibility for both the legacy and the new LB config mechanism. The new configuration mechanism is explained in gRFC A52: https://github.com/grpc/proposal/pull/298
This commit is contained in:
parent
15ecc0714e
commit
c20904d681
|
|
@ -1635,9 +1635,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
|
|||
}
|
||||
CdsUpdate.Builder updateBuilder = structOrError.getStruct();
|
||||
|
||||
// TODO: If load_balancing_policy is set in Cluster use it for LB config, otherwise fall back
|
||||
// to using the legacy lb_policy field.
|
||||
ImmutableMap<String, ?> lbPolicyConfig = LegacyLoadBalancerConfigFactory.newConfig(cluster,
|
||||
ImmutableMap<String, ?> lbPolicyConfig = LoadBalancerConfigFactory.newConfig(cluster,
|
||||
enableLeastRequest);
|
||||
|
||||
// Validate the LB config by trying to parse it with the corresponding LB provider.
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ package io.grpc.xds;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
|
||||
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.Attributes;
|
||||
|
|
@ -49,8 +48,6 @@ import io.grpc.xds.Endpoints.LocalityLbEndpoints;
|
|||
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
|
||||
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
|
||||
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
|
||||
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
|
||||
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
|
||||
import io.grpc.xds.XdsClient.EdsResourceWatcher;
|
||||
import io.grpc.xds.XdsClient.EdsUpdate;
|
||||
import io.grpc.xds.XdsLogger.XdsLogLevel;
|
||||
|
|
@ -208,6 +205,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
List<EquivalentAddressGroup> addresses = new ArrayList<>();
|
||||
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
|
||||
List<String> priorities = new ArrayList<>(); // totally ordered priority list
|
||||
Map<Locality, Integer> localityWeights = new HashMap<>();
|
||||
|
||||
Status endpointNotFound = Status.OK;
|
||||
for (String cluster : clusters) {
|
||||
ClusterState state = clusterStates.get(cluster);
|
||||
|
|
@ -219,6 +218,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
addresses.addAll(state.result.addresses);
|
||||
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
|
||||
priorities.addAll(state.result.priorities);
|
||||
localityWeights.putAll(state.result.localityWeights);
|
||||
} else {
|
||||
endpointNotFound = state.status;
|
||||
}
|
||||
|
|
@ -249,6 +249,9 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
resolvedAddresses.toBuilder()
|
||||
.setLoadBalancingPolicyConfig(childConfig)
|
||||
.setAddresses(Collections.unmodifiableList(addresses))
|
||||
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
|
||||
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
|
||||
Collections.unmodifiableMap(localityWeights)).build())
|
||||
.build());
|
||||
}
|
||||
|
||||
|
|
@ -318,6 +321,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
// Most recently resolved addresses and config, or null if resource not exists.
|
||||
@Nullable
|
||||
protected ClusterResolutionResult result;
|
||||
|
||||
protected boolean shutdown;
|
||||
|
||||
private ClusterState(String name, @Nullable ServerInfo lrsServerInfo,
|
||||
|
|
@ -377,6 +381,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
|
||||
update.localityLbEndpointsMap;
|
||||
Map<Locality, Integer> localityWeights = new HashMap<>();
|
||||
List<DropOverload> dropOverloads = update.dropPolicies;
|
||||
List<EquivalentAddressGroup> addresses = new ArrayList<>();
|
||||
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
|
||||
|
|
@ -409,6 +414,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
"Discard locality {0} with 0 healthy endpoints", locality);
|
||||
continue;
|
||||
}
|
||||
localityWeights.put(locality, localityLbInfo.localityWeight());
|
||||
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
|
||||
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
|
||||
}
|
||||
|
|
@ -428,7 +434,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
|
||||
status = Status.OK;
|
||||
resolved = true;
|
||||
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities);
|
||||
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities,
|
||||
localityWeights);
|
||||
handleEndpointResourceUpdate();
|
||||
}
|
||||
}
|
||||
|
|
@ -634,18 +641,23 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
private final Map<String, PriorityChildConfig> priorityChildConfigs;
|
||||
// List of priority names ordered in descending priorities.
|
||||
private final List<String> priorities;
|
||||
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
|
||||
// clusters that support the concept.
|
||||
private final Map<Locality, Integer> localityWeights;
|
||||
|
||||
ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
|
||||
PriorityChildConfig config) {
|
||||
this(addresses, Collections.singletonMap(priority, config),
|
||||
Collections.singletonList(priority));
|
||||
Collections.singletonList(priority), Collections.emptyMap());
|
||||
}
|
||||
|
||||
ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
|
||||
Map<String, PriorityChildConfig> configs, List<String> priorities) {
|
||||
Map<String, PriorityChildConfig> configs, List<String> priorities,
|
||||
Map<Locality, Integer> localityWeights) {
|
||||
this.addresses = addresses;
|
||||
this.priorityChildConfigs = configs;
|
||||
this.priorities = priorities;
|
||||
this.localityWeights = localityWeights;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -686,32 +698,9 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
List<DropOverload> dropOverloads) {
|
||||
Map<String, PriorityChildConfig> configs = new HashMap<>();
|
||||
for (String priority : prioritizedLocalityWeights.keySet()) {
|
||||
PolicySelection leafPolicy = endpointLbPolicy;
|
||||
// Depending on the endpoint-level load balancing policy, different LB hierarchy may be
|
||||
// created. If the endpoint-level LB policy is round_robin or least_request_experimental,
|
||||
// it creates a two-level LB hierarchy: a locality-level LB policy that balances load
|
||||
// according to locality weights followed by an endpoint-level LB policy that balances load
|
||||
// between endpoints within the locality. If the endpoint-level LB policy is
|
||||
// ring_hash_experimental, it creates a unified LB policy that balances load by weighing the
|
||||
// product of each endpoint's weight and the weight of the locality it belongs to.
|
||||
if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")
|
||||
|| endpointLbPolicy.getProvider().getPolicyName().equals("least_request_experimental")) {
|
||||
Map<Locality, Integer> localityWeights = prioritizedLocalityWeights.get(priority);
|
||||
Map<String, WeightedPolicySelection> targets = new HashMap<>();
|
||||
for (Locality locality : localityWeights.keySet()) {
|
||||
int weight = localityWeights.get(locality);
|
||||
WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy);
|
||||
targets.put(localityName(locality), target);
|
||||
}
|
||||
LoadBalancerProvider weightedTargetLbProvider =
|
||||
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME);
|
||||
WeightedTargetConfig weightedTargetConfig =
|
||||
new WeightedTargetConfig(Collections.unmodifiableMap(targets));
|
||||
leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig);
|
||||
}
|
||||
ClusterImplConfig clusterImplConfig =
|
||||
new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests,
|
||||
dropOverloads, leafPolicy, tlsContext);
|
||||
dropOverloads, endpointLbPolicy, tlsContext);
|
||||
LoadBalancerProvider clusterImplLbProvider =
|
||||
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
|
||||
PolicySelection clusterImplPolicy =
|
||||
|
|
|
|||
|
|
@ -1,105 +0,0 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
|
||||
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
|
||||
|
||||
/**
|
||||
* Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message. The
|
||||
* lb_policy field is used to select the policy and configuration is extracted from various policy
|
||||
* specific fields in Cluster.
|
||||
*/
|
||||
abstract class LegacyLoadBalancerConfigFactory {
|
||||
|
||||
static final String ROUND_ROBIN_FIELD_NAME = "round_robin";
|
||||
|
||||
static final String RING_HASH_FIELD_NAME = "ring_hash_experimental";
|
||||
static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize";
|
||||
static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize";
|
||||
|
||||
static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental";
|
||||
static final String CHOICE_COUNT_FIELD_NAME = "choiceCount";
|
||||
|
||||
/**
|
||||
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
|
||||
* Cluster}.
|
||||
*
|
||||
* @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration.
|
||||
*/
|
||||
static ImmutableMap<String, ?> newConfig(Cluster cluster, boolean enableLeastRequest)
|
||||
throws ResourceInvalidException {
|
||||
switch (cluster.getLbPolicy()) {
|
||||
case ROUND_ROBIN:
|
||||
return newRoundRobinConfig();
|
||||
case RING_HASH:
|
||||
return newRingHashConfig(cluster);
|
||||
case LEAST_REQUEST:
|
||||
if (enableLeastRequest) {
|
||||
return newLeastRequestConfig(cluster);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
}
|
||||
throw new ResourceInvalidException(
|
||||
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
|
||||
}
|
||||
|
||||
// Builds an empty configuration for round robin (it is not configurable).
|
||||
private static ImmutableMap<String, ?> newRoundRobinConfig() {
|
||||
return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of());
|
||||
}
|
||||
|
||||
// Builds a ring hash config and validates the hash function selection.
|
||||
private static ImmutableMap<String, ?> newRingHashConfig(Cluster cluster)
|
||||
throws ResourceInvalidException {
|
||||
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
|
||||
|
||||
// The hash function needs to be validated here as it is not exposed in the returned
|
||||
// configuration for later validation.
|
||||
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
|
||||
throw new ResourceInvalidException(
|
||||
"Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig);
|
||||
}
|
||||
|
||||
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
|
||||
if (lbConfig.hasMinimumRingSize()) {
|
||||
configBuilder.put(MIN_RING_SIZE_FIELD_NAME,
|
||||
((Long) lbConfig.getMinimumRingSize().getValue()).doubleValue());
|
||||
}
|
||||
if (lbConfig.hasMaximumRingSize()) {
|
||||
configBuilder.put(MAX_RING_SIZE_FIELD_NAME,
|
||||
((Long) lbConfig.getMaximumRingSize().getValue()).doubleValue());
|
||||
}
|
||||
return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.buildOrThrow());
|
||||
}
|
||||
|
||||
// Builds a new least request config.
|
||||
private static ImmutableMap<String, ?> newLeastRequestConfig(Cluster cluster) {
|
||||
LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig();
|
||||
|
||||
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
|
||||
if (lbConfig.hasChoiceCount()) {
|
||||
configBuilder.put(CHOICE_COUNT_FIELD_NAME,
|
||||
((Integer) lbConfig.getChoiceCount().getValue()).doubleValue());
|
||||
}
|
||||
return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.buildOrThrow());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,321 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import com.github.xds.type.v3.TypedStruct;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
||||
import io.grpc.InternalLogId;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.internal.JsonParser;
|
||||
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
|
||||
import io.grpc.xds.LoadBalancerConfigFactory.LoadBalancingPolicyConverter.MaxRecursionReachedException;
|
||||
import io.grpc.xds.XdsLogger.XdsLogLevel;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Creates service config JSON load balancer config objects for a given xDS Cluster message.
|
||||
* Supports both the "legacy" configuration style and the new, more advanced one that utilizes the
|
||||
* xDS "typed extension" mechanism.
|
||||
*
|
||||
* <p>Legacy configuration is done by setting the lb_policy enum field and any supporting
|
||||
* configuration fields needed by the particular policy.
|
||||
*
|
||||
* <p>The new approach is to set the load_balancing_policy field that contains both the policy
|
||||
* selection as well as any supporting configuration data. Providing a list of acceptable policies
|
||||
* is also supported. Note that if this field is used, it will override any configuration set using
|
||||
* the legacy approach. The new configuration approach is explained in detail in the <a href="
|
||||
* https://github.com/grpc/proposal/blob/master/A52-xds-custom-lb-policies.md">Custom LB Policies
|
||||
* gRFC</a>
|
||||
*/
|
||||
class LoadBalancerConfigFactory {
|
||||
|
||||
private static XdsLogger logger = XdsLogger.withLogId(
|
||||
InternalLogId.allocate("xds-client-lbconfig-factory", null));
|
||||
|
||||
static final String ROUND_ROBIN_FIELD_NAME = "round_robin";
|
||||
|
||||
static final String RING_HASH_FIELD_NAME = "ring_hash_experimental";
|
||||
static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize";
|
||||
static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize";
|
||||
|
||||
static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental";
|
||||
static final String CHOICE_COUNT_FIELD_NAME = "choiceCount";
|
||||
|
||||
static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental";
|
||||
static final String CHILD_POLICY_FIELD = "childPolicy";
|
||||
|
||||
/**
|
||||
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
|
||||
* Cluster}.
|
||||
*
|
||||
* @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration.
|
||||
*/
|
||||
static ImmutableMap<String, ?> newConfig(Cluster cluster, boolean enableLeastRequest)
|
||||
throws ResourceInvalidException {
|
||||
// The new load_balancing_policy will always be used if it is set, but for backward
|
||||
// compatibility we will fall back to using the old lb_policy field if the new field is not set.
|
||||
if (cluster.hasLoadBalancingPolicy()) {
|
||||
try {
|
||||
return LoadBalancingPolicyConverter.convertToServiceConfig(cluster.getLoadBalancingPolicy(),
|
||||
0);
|
||||
} catch (MaxRecursionReachedException e) {
|
||||
throw new ResourceInvalidException("Maximum LB config recursion depth reached");
|
||||
}
|
||||
} else {
|
||||
return LegacyLoadBalancingPolicyConverter.convertToServiceConfig(cluster, enableLeastRequest);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a service config JSON object for the ring_hash load balancer config based on the given
|
||||
* config values.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> buildRingHashConfig(Long minRingSize, Long maxRingSize) {
|
||||
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
|
||||
if (minRingSize != null) {
|
||||
configBuilder.put(MIN_RING_SIZE_FIELD_NAME, minRingSize.doubleValue());
|
||||
}
|
||||
if (maxRingSize != null) {
|
||||
configBuilder.put(MAX_RING_SIZE_FIELD_NAME, maxRingSize.doubleValue());
|
||||
}
|
||||
return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.buildOrThrow());
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a service config JSON object for the least_request load balancer config based on the
|
||||
* given config values..
|
||||
*/
|
||||
private static ImmutableMap<String, ?> buildLeastRequestConfig(Integer choiceCount) {
|
||||
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
|
||||
if (choiceCount != null) {
|
||||
configBuilder.put(CHOICE_COUNT_FIELD_NAME, choiceCount.doubleValue());
|
||||
}
|
||||
return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.buildOrThrow());
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a service config JSON wrr_locality by wrapping another policy config.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> buildWrrLocalityConfig(
|
||||
ImmutableMap<String, ?> childConfig) {
|
||||
return ImmutableMap.<String, Object>builder().put(WRR_LOCALITY_FIELD_NAME,
|
||||
ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).buildOrThrow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds an empty service config JSON config object for round robin (it is not configurable).
|
||||
*/
|
||||
private static ImmutableMap<String, ?> buildRoundRobinConfig() {
|
||||
return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of());
|
||||
}
|
||||
|
||||
/**
|
||||
* Responsible for converting from a {@code envoy.config.cluster.v3.LoadBalancingPolicy} proto
|
||||
* message to a gRPC service config format.
|
||||
*/
|
||||
static class LoadBalancingPolicyConverter {
|
||||
|
||||
private static final int MAX_RECURSION = 16;
|
||||
|
||||
/**
|
||||
* Converts a {@link LoadBalancingPolicy} object to a service config JSON object.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertToServiceConfig(
|
||||
LoadBalancingPolicy loadBalancingPolicy, int recursionDepth)
|
||||
throws ResourceInvalidException, MaxRecursionReachedException {
|
||||
if (recursionDepth > MAX_RECURSION) {
|
||||
throw new MaxRecursionReachedException();
|
||||
}
|
||||
ImmutableMap<String, ?> serviceConfig = null;
|
||||
|
||||
for (Policy policy : loadBalancingPolicy.getPoliciesList()) {
|
||||
Any typedConfig = policy.getTypedExtensionConfig().getTypedConfig();
|
||||
try {
|
||||
if (typedConfig.is(RingHash.class)) {
|
||||
serviceConfig = convertRingHashConfig(typedConfig.unpack(RingHash.class));
|
||||
} else if (typedConfig.is(WrrLocality.class)) {
|
||||
serviceConfig = convertWrrLocalityConfig(typedConfig.unpack(WrrLocality.class),
|
||||
recursionDepth);
|
||||
} else if (typedConfig.is(RoundRobin.class)) {
|
||||
serviceConfig = convertRoundRobinConfig();
|
||||
} else if (typedConfig.is(TypedStruct.class)) {
|
||||
serviceConfig = convertCustomConfig(typedConfig.unpack(TypedStruct.class));
|
||||
}
|
||||
// TODO: support least_request once it is added to the envoy protos.
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new ResourceInvalidException(
|
||||
"Unable to unpack typedConfig for: " + typedConfig.getTypeUrl(), e);
|
||||
}
|
||||
// The service config is expected to have a single root entry, where the name of that entry
|
||||
// is the name of the policy. A Load balancer with this name must exist in the registry.
|
||||
if (serviceConfig == null || LoadBalancerRegistry.getDefaultRegistry()
|
||||
.getProvider(Iterables.getOnlyElement(serviceConfig.keySet())) == null) {
|
||||
logger.log(XdsLogLevel.WARNING, "Policy {0} not found in the LB registry, skipping",
|
||||
typedConfig.getTypeUrl());
|
||||
continue;
|
||||
} else {
|
||||
return serviceConfig;
|
||||
}
|
||||
}
|
||||
|
||||
// If we could not find a Policy that we could both convert as well as find a provider for
|
||||
// then we have an invalid LB policy configuration.
|
||||
throw new ResourceInvalidException("Invalid LoadBalancingPolicy: " + loadBalancingPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a ring_hash {@link Any} configuration to service config format.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertRingHashConfig(RingHash ringHash)
|
||||
throws InvalidProtocolBufferException, ResourceInvalidException {
|
||||
// The hash function needs to be validated here as it is not exposed in the returned
|
||||
// configuration for later validation.
|
||||
if (RingHash.HashFunction.XX_HASH != ringHash.getHashFunction()) {
|
||||
throw new ResourceInvalidException(
|
||||
"Invalid ring hash function: " + ringHash.getHashFunction());
|
||||
}
|
||||
|
||||
return buildRingHashConfig(
|
||||
ringHash.hasMinimumRingSize() ? ringHash.getMinimumRingSize().getValue() : null,
|
||||
ringHash.hasMaximumRingSize() ? ringHash.getMaximumRingSize().getValue() : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a wrr_locality {@link Any} configuration to service config format.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertWrrLocalityConfig(WrrLocality wrrLocality,
|
||||
int recursionDepth) throws InvalidProtocolBufferException, ResourceInvalidException,
|
||||
MaxRecursionReachedException {
|
||||
return buildWrrLocalityConfig(
|
||||
convertToServiceConfig(wrrLocality.getEndpointPickingPolicy(), recursionDepth + 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* "Converts" a round_robin configuration to service config format.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertRoundRobinConfig() {
|
||||
return buildRoundRobinConfig();
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a custom LB config {@link Any} configuration to service config format.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private static ImmutableMap<String, ?> convertCustomConfig(TypedStruct configTypedStruct)
|
||||
throws InvalidProtocolBufferException, ResourceInvalidException {
|
||||
Object rawJsonConfig = null;
|
||||
try {
|
||||
rawJsonConfig = JsonParser.parse(JsonFormat.printer().print(configTypedStruct.getValue()));
|
||||
} catch (IOException e) {
|
||||
throw new ResourceInvalidException("Unable to parse custom LB config JSON", e);
|
||||
}
|
||||
|
||||
if (!(rawJsonConfig instanceof Map)) {
|
||||
throw new ResourceInvalidException("Custom LB config does not contain a JSON object");
|
||||
}
|
||||
|
||||
String customConfigTypeName = configTypedStruct.getTypeUrl();
|
||||
if (customConfigTypeName.contains("/")) {
|
||||
customConfigTypeName = customConfigTypeName.substring(
|
||||
customConfigTypeName.lastIndexOf("/") + 1);
|
||||
}
|
||||
|
||||
return ImmutableMap.of(customConfigTypeName, (Map<String, ?>) rawJsonConfig);
|
||||
}
|
||||
|
||||
// Used to signal that the LB config goes too deep.
|
||||
static class MaxRecursionReachedException extends Exception {
|
||||
static final long serialVersionUID = 1L;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message.
|
||||
* The lb_policy field is used to select the policy and configuration is extracted from various
|
||||
* policy specific fields in Cluster.
|
||||
*/
|
||||
static class LegacyLoadBalancingPolicyConverter {
|
||||
|
||||
/**
|
||||
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
|
||||
* Cluster}.
|
||||
*
|
||||
* @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration.
|
||||
*/
|
||||
static ImmutableMap<String, ?> convertToServiceConfig(Cluster cluster,
|
||||
boolean enableLeastRequest) throws ResourceInvalidException {
|
||||
switch (cluster.getLbPolicy()) {
|
||||
case RING_HASH:
|
||||
return convertRingHashConfig(cluster);
|
||||
case ROUND_ROBIN:
|
||||
return buildWrrLocalityConfig(buildRoundRobinConfig());
|
||||
case LEAST_REQUEST:
|
||||
if (enableLeastRequest) {
|
||||
return buildWrrLocalityConfig(convertLeastRequestConfig(cluster));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
}
|
||||
throw new ResourceInvalidException(
|
||||
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ring_hash service config JSON object based on the old {@link RingHashLbConfig}
|
||||
* config message.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertRingHashConfig(Cluster cluster)
|
||||
throws ResourceInvalidException {
|
||||
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
|
||||
|
||||
// The hash function needs to be validated here as it is not exposed in the returned
|
||||
// configuration for later validation.
|
||||
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
|
||||
throw new ResourceInvalidException(
|
||||
"Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig);
|
||||
}
|
||||
|
||||
return buildRingHashConfig(
|
||||
lbConfig.hasMinimumRingSize() ? (Long) lbConfig.getMinimumRingSize().getValue() : null,
|
||||
lbConfig.hasMaximumRingSize() ? (Long) lbConfig.getMaximumRingSize().getValue() : null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new least_request service config JSON object based on the old {@link
|
||||
* LeastRequestLbConfig} config message.
|
||||
*/
|
||||
private static ImmutableMap<String, ?> convertLeastRequestConfig(Cluster cluster) {
|
||||
LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig();
|
||||
return buildLeastRequestConfig(
|
||||
lbConfig.hasChoiceCount() ? (Integer) lbConfig.getChoiceCount().getValue() : null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -105,6 +105,7 @@ import io.grpc.InsecureChannelCredentials;
|
|||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.internal.JsonUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.lookup.v1.GrpcKeyBuilder;
|
||||
|
|
@ -1760,7 +1761,10 @@ public class ClientXdsClientDataTest {
|
|||
cluster, new HashSet<String>(), null, LRS_SERVER_INFO,
|
||||
LoadBalancerRegistry.getDefaultRegistry());
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -1628,8 +1628,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1651,8 +1654,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1680,8 +1686,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
|
||||
assertThat(lbConfig.getRawConfigValue().get("choiceCount")).isEqualTo(3);
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental");
|
||||
assertThat(childConfigs.get(0).getRawConfigValue().get("choiceCount")).isEqualTo(3);
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1736,8 +1745,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
|
||||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder();
|
||||
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT);
|
||||
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
|
||||
|
|
@ -1758,8 +1770,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L);
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1903,8 +1918,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1946,8 +1964,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
|
||||
assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -1966,8 +1987,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -2035,8 +2059,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isNull();
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -2088,8 +2115,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
|
||||
assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isNull();
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -2098,8 +2128,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
@ -2108,8 +2141,11 @@ public abstract class ClientXdsClientTestBase {
|
|||
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
|
||||
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
|
||||
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
|
||||
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
|
||||
.getPolicyName()).isEqualTo("round_robin");
|
||||
lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
|
||||
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
|
||||
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
|
|||
import static io.grpc.xds.XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME;
|
||||
import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME;
|
||||
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
|
||||
import static io.grpc.xds.XdsLbPolicies.WRR_LOCALITY_POLICY_NAME;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
|
@ -69,8 +70,7 @@ import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig;
|
|||
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
|
||||
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
|
||||
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
|
||||
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection;
|
||||
import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig;
|
||||
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
|
||||
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
|
|
@ -133,12 +133,15 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private final FakeClock fakeClock = new FakeClock();
|
||||
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
|
||||
private final NameResolverRegistry nsRegistry = new NameResolverRegistry();
|
||||
private final PolicySelection roundRobin =
|
||||
new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null);
|
||||
private final PolicySelection roundRobin = new PolicySelection(
|
||||
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
|
||||
new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null)));
|
||||
private final PolicySelection ringHash = new PolicySelection(
|
||||
new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L));
|
||||
private final PolicySelection leastRequest = new PolicySelection(
|
||||
new FakeLoadBalancerProvider("least_request_experimental"), new LeastRequestConfig(3));
|
||||
new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig(
|
||||
new PolicySelection(new FakeLoadBalancerProvider("least_request_experimental"),
|
||||
new LeastRequestConfig(3))));
|
||||
private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
|
||||
private final List<FakeNameResolver> resolvers = new ArrayList<>();
|
||||
private final FakeXdsClient xdsClient = new FakeXdsClient();
|
||||
|
|
@ -303,10 +306,16 @@ public class ClusterResolverLoadBalancerTest {
|
|||
ClusterImplConfig clusterImplConfig =
|
||||
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
|
||||
WeightedTargetConfig weightedTargetConfig =
|
||||
(WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality1.toString());
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
WrrLocalityConfig wrrLocalityConfig =
|
||||
(WrrLocalityConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
assertThat(wrrLocalityConfig.childPolicy.getProvider().getPolicyName()).isEqualTo(
|
||||
"least_request_experimental");
|
||||
|
||||
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
|
||||
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
|
||||
assertThat(localityWeights).containsKey(locality1);
|
||||
assertThat(localityWeights.get(locality1)).isEqualTo(100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -366,13 +375,12 @@ public class ClusterResolverLoadBalancerTest {
|
|||
ClusterImplConfig clusterImplConfig1 =
|
||||
(ClusterImplConfig) priorityChildConfig1.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
|
||||
WeightedTargetConfig weightedTargetConfig1 =
|
||||
(WeightedTargetConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
assertThat(weightedTargetConfig1.targets.keySet()).containsExactly(locality1.toString());
|
||||
WeightedPolicySelection target1 = weightedTargetConfig1.targets.get(locality1.toString());
|
||||
assertThat(target1.weight).isEqualTo(70);
|
||||
assertThat(target1.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig1.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig1 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
assertThat(wrrLocalityConfig1.childPolicy.getProvider().getPolicyName()).isEqualTo(
|
||||
"round_robin");
|
||||
|
||||
PriorityChildConfig priorityChildConfig2 = priorityLbConfig.childConfigs.get(priority2);
|
||||
assertThat(priorityChildConfig2.ignoreReresolution).isTrue();
|
||||
|
|
@ -381,21 +389,12 @@ public class ClusterResolverLoadBalancerTest {
|
|||
ClusterImplConfig clusterImplConfig2 =
|
||||
(ClusterImplConfig) priorityChildConfig2.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
|
||||
WeightedTargetConfig weightedTargetConfig2 =
|
||||
(WeightedTargetConfig) clusterImplConfig2.childPolicy.getConfig();
|
||||
assertThat(weightedTargetConfig2.targets.keySet()).containsExactly(locality3.toString());
|
||||
WeightedPolicySelection target2 = weightedTargetConfig2.targets.get(locality3.toString());
|
||||
assertThat(target2.weight).isEqualTo(20);
|
||||
assertThat(target2.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
List<EquivalentAddressGroup> priorityAddrs1 =
|
||||
AddressFilter.filter(childBalancer.addresses, priority1);
|
||||
assertThat(priorityAddrs1).hasSize(2);
|
||||
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), priorityAddrs1);
|
||||
List<EquivalentAddressGroup> priorityAddrs2 =
|
||||
AddressFilter.filter(childBalancer.addresses, priority2);
|
||||
assertThat(priorityAddrs2).hasSize(1);
|
||||
assertAddressesEqual(Collections.singletonList(endpoint4), priorityAddrs2);
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig2.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig2 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
assertThat(wrrLocalityConfig2.childPolicy.getProvider().getPolicyName()).isEqualTo(
|
||||
"round_robin");
|
||||
|
||||
PriorityChildConfig priorityChildConfig3 = priorityLbConfig.childConfigs.get(priority3);
|
||||
assertThat(priorityChildConfig3.ignoreReresolution).isTrue();
|
||||
|
|
@ -404,17 +403,21 @@ public class ClusterResolverLoadBalancerTest {
|
|||
ClusterImplConfig clusterImplConfig3 =
|
||||
(ClusterImplConfig) priorityChildConfig3.policySelection.getConfig();
|
||||
assertClusterImplConfig(clusterImplConfig3, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L,
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WEIGHTED_TARGET_POLICY_NAME);
|
||||
WeightedTargetConfig weightedTargetConfig3 =
|
||||
(WeightedTargetConfig) clusterImplConfig3.childPolicy.getConfig();
|
||||
assertThat(weightedTargetConfig3.targets.keySet()).containsExactly(locality2.toString());
|
||||
WeightedPolicySelection target3 = weightedTargetConfig3.targets.get(locality2.toString());
|
||||
assertThat(target3.weight).isEqualTo(10);
|
||||
assertThat(target3.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin");
|
||||
List<EquivalentAddressGroup> priorityAddrs3 =
|
||||
AddressFilter.filter(childBalancer.addresses, priority3);
|
||||
assertThat(priorityAddrs3).hasSize(1);
|
||||
assertAddressesEqual(Collections.singletonList(endpoint3), priorityAddrs3);
|
||||
tlsContext, Collections.<DropOverload>emptyList(), WRR_LOCALITY_POLICY_NAME);
|
||||
assertThat(clusterImplConfig3.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class);
|
||||
WrrLocalityConfig wrrLocalityConfig3 =
|
||||
(WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig();
|
||||
assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo(
|
||||
"round_robin");
|
||||
|
||||
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
|
||||
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
|
||||
assertThat(localityWeights).containsKey(locality1);
|
||||
assertThat(localityWeights.get(locality1)).isEqualTo(70);
|
||||
assertThat(localityWeights).containsKey(locality2);
|
||||
assertThat(localityWeights.get(locality2)).isEqualTo(10);
|
||||
assertThat(localityWeights).containsKey(locality3);
|
||||
assertThat(localityWeights.get(locality3)).isEqualTo(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -510,19 +513,14 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
|
||||
10 /* localityWeight */, 1 /* priority */);
|
||||
String priority = CLUSTER1 + "[priority1]";
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
|
||||
PriorityChildConfig priorityChildConfig = priorityLbConfig.childConfigs.get(priority);
|
||||
ClusterImplConfig clusterImplConfig =
|
||||
(ClusterImplConfig) priorityChildConfig.policySelection.getConfig();
|
||||
WeightedTargetConfig weightedTargetConfig =
|
||||
(WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig();
|
||||
assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality2.toString());
|
||||
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
|
||||
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
|
||||
assertThat(localityWeights.keySet()).containsExactly(locality2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -1142,6 +1140,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
private final Helper helper;
|
||||
private List<EquivalentAddressGroup> addresses;
|
||||
private Object config;
|
||||
private Attributes attributes;
|
||||
private Status upstreamError;
|
||||
private boolean shutdown;
|
||||
|
||||
|
|
@ -1154,6 +1153,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
addresses = resolvedAddresses.getAddresses();
|
||||
config = resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||
attributes = resolvedAddresses.getAttributes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -1,120 +0,0 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.protobuf.UInt32Value;
|
||||
import com.google.protobuf.UInt64Value;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction;
|
||||
import io.grpc.internal.JsonUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Unit test for {@link LegacyLoadBalancerConfigFactory}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LegacyLoadBalancerConfigFactoryTest {
|
||||
|
||||
@Test
|
||||
public void roundRobin() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build();
|
||||
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(lbConfig.getRawConfigValue()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLbPolicy(LbPolicy.RING_HASH)
|
||||
.setRingHashLbConfig(
|
||||
RingHashLbConfig.newBuilder()
|
||||
.setMinimumRingSize(UInt64Value.of(1))
|
||||
.setMaximumRingSize(UInt64Value.of(2)))
|
||||
.build();
|
||||
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental");
|
||||
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(1);
|
||||
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash_invalidHash() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig(
|
||||
RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("invalid ring hash function");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void leastRequest() throws ResourceInvalidException {
|
||||
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true");
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLbPolicy(LbPolicy.LEAST_REQUEST)
|
||||
.setLeastRequestLbConfig(
|
||||
LeastRequestLbConfig.newBuilder().setChoiceCount(UInt32Value.of(10)))
|
||||
.build();
|
||||
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
|
||||
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "choiceCount")).isEqualTo(10);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void leastRequest_notEnabled() throws ResourceInvalidException {
|
||||
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false");
|
||||
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LegacyLoadBalancerConfigFactory.newConfig(cluster, false));
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("unsupported lb policy");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,356 @@
|
|||
/*
|
||||
* Copyright 2022 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.github.xds.type.v3.TypedStruct;
|
||||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.Struct;
|
||||
import com.google.protobuf.UInt32Value;
|
||||
import com.google.protobuf.UInt64Value;
|
||||
import com.google.protobuf.Value;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy;
|
||||
import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy;
|
||||
import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin;
|
||||
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
import io.grpc.LoadBalancerRegistry;
|
||||
import io.grpc.internal.JsonUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil;
|
||||
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Unit test for {@link LoadBalancerConfigFactory}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadBalancerConfigFactoryTest {
|
||||
|
||||
private static final Policy ROUND_ROBIN_POLICY = Policy.newBuilder().setTypedExtensionConfig(
|
||||
TypedExtensionConfig.newBuilder().setTypedConfig(
|
||||
Any.pack(RoundRobin.newBuilder().build()))).build();
|
||||
|
||||
private static final long RING_HASH_MIN_RING_SIZE = 1;
|
||||
private static final long RING_HASH_MAX_RING_SIZE = 2;
|
||||
private static final Policy RING_HASH_POLICY = Policy.newBuilder().setTypedExtensionConfig(
|
||||
TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack(
|
||||
RingHash.newBuilder()
|
||||
.setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE))
|
||||
.setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE))
|
||||
.setHashFunction(RingHash.HashFunction.XX_HASH).build()))).build();
|
||||
|
||||
private static final String CUSTOM_POLICY_NAME = "myorg.MyCustomLeastRequestPolicy";
|
||||
private static final String CUSTOM_POLICY_FIELD_KEY = "choiceCount";
|
||||
private static final double CUSTOM_POLICY_FIELD_VALUE = 2;
|
||||
private static final Policy CUSTOM_POLICY = Policy.newBuilder().setTypedExtensionConfig(
|
||||
TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack(TypedStruct.newBuilder()
|
||||
.setTypeUrl("type.googleapis.com/" + CUSTOM_POLICY_NAME).setValue(
|
||||
Struct.newBuilder()
|
||||
.putFields(CUSTOM_POLICY_FIELD_KEY,
|
||||
Value.newBuilder().setNumberValue(CUSTOM_POLICY_FIELD_VALUE).build()))
|
||||
.build()))).build();
|
||||
private static final FakeCustomLoadBalancerProvider CUSTOM_POLICY_PROVIDER
|
||||
= new FakeCustomLoadBalancerProvider();
|
||||
|
||||
private static Policy buildWrrPolicy(Policy childPolicy) {
|
||||
return Policy.newBuilder().setTypedExtensionConfig(TypedExtensionConfig.newBuilder()
|
||||
.setTypedConfig(Any.pack(WrrLocality.newBuilder()
|
||||
.setEndpointPickingPolicy(LoadBalancingPolicy.newBuilder().addPolicies(childPolicy))
|
||||
.build()))).build();
|
||||
}
|
||||
|
||||
@After
|
||||
public void deregisterCustomProvider() {
|
||||
LoadBalancerRegistry.getDefaultRegistry().deregister(CUSTOM_POLICY_PROVIDER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void roundRobin() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(
|
||||
LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY)))
|
||||
.build();
|
||||
|
||||
assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void roundRobin_legacy() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build();
|
||||
|
||||
assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true)));
|
||||
}
|
||||
|
||||
private void assertValidRoundRobin(LbConfig lbConfig) {
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
(List<Map<String, ?>>) lbConfig.getRawConfigValue().get("childPolicy"));
|
||||
assertThat(childConfigs).hasSize(1);
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
|
||||
assertThat(childConfigs.get(0).getRawConfigValue()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder().addPolicies(RING_HASH_POLICY))
|
||||
.build();
|
||||
|
||||
assertValidRingHash(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash_legacy() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLbPolicy(LbPolicy.RING_HASH)
|
||||
.setRingHashLbConfig(
|
||||
RingHashLbConfig.newBuilder()
|
||||
.setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE))
|
||||
.setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE))
|
||||
.setHashFunction(HashFunction.XX_HASH))
|
||||
.build();
|
||||
|
||||
assertValidRingHash(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true)));
|
||||
}
|
||||
|
||||
private void assertValidRingHash(LbConfig lbConfig) {
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental");
|
||||
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(
|
||||
RING_HASH_MIN_RING_SIZE);
|
||||
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(
|
||||
RING_HASH_MAX_RING_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash_invalidHash() {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder()
|
||||
.addPolicies(Policy.newBuilder().setTypedExtensionConfig(
|
||||
TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack(
|
||||
RingHash.newBuilder()
|
||||
.setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE))
|
||||
.setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE))
|
||||
.setHashFunction(RingHash.HashFunction.MURMUR_HASH_2).build()))).build()))
|
||||
.build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
} catch (ResourceInvalidException e) {
|
||||
// With the new config mechanism we get a more generic error than with the old one because the
|
||||
// logic loops over potentially multiple configurations and only throws an exception at the
|
||||
// end if there was no valid policies found.
|
||||
assertThat(e).hasMessageThat().contains("Invalid ring hash function");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ringHash_invalidHash_legacy() {
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig(
|
||||
RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("invalid ring hash function");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void leastRequest() throws ResourceInvalidException {
|
||||
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true");
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLbPolicy(LbPolicy.LEAST_REQUEST)
|
||||
.setLeastRequestLbConfig(
|
||||
LeastRequestLbConfig.newBuilder().setChoiceCount(UInt32Value.of(10)))
|
||||
.build();
|
||||
|
||||
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, true));
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
(List<Map<String, ?>>) lbConfig.getRawConfigValue().get("childPolicy"));
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental");
|
||||
assertThat(
|
||||
JsonUtil.getNumberAsLong(childConfigs.get(0).getRawConfigValue(), "choiceCount")).isEqualTo(
|
||||
10);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void leastRequest_notEnabled() {
|
||||
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false");
|
||||
|
||||
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, false));
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("unsupported lb policy");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customConfiguration() throws ResourceInvalidException {
|
||||
LoadBalancerRegistry.getDefaultRegistry().register(CUSTOM_POLICY_PROVIDER);
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(
|
||||
LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY)))
|
||||
.build();
|
||||
|
||||
assertValidCustomConfig(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, false)));
|
||||
}
|
||||
|
||||
// When a provider for the custom policy is available, the configuration should use it.
|
||||
@Test
|
||||
public void complexCustomConfig_customProviderRegistered() throws ResourceInvalidException {
|
||||
LoadBalancerRegistry.getDefaultRegistry().register(CUSTOM_POLICY_PROVIDER);
|
||||
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(
|
||||
LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY))
|
||||
.addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY)))
|
||||
.build();
|
||||
|
||||
assertValidCustomConfig(ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, false)));
|
||||
}
|
||||
|
||||
// When a provider for the custom policy is NOT available, we still fail even if there is another
|
||||
// round_robin configuration in the list as the wrr_locality the custom config is wrapped in is
|
||||
// a recognized type and expected to have a valid config.
|
||||
@Test
|
||||
public void complexCustomConfig_customProviderNotRegistered() throws ResourceInvalidException {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(
|
||||
LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY))
|
||||
.addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY)))
|
||||
.build();
|
||||
|
||||
try {
|
||||
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||
LoadBalancerConfigFactory.newConfig(cluster, false));
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("Invalid LoadBalancingPolicy");
|
||||
return;
|
||||
}
|
||||
fail("ResourceInvalidException not thrown");
|
||||
}
|
||||
|
||||
private void assertValidCustomConfig(LbConfig lbConfig) {
|
||||
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
|
||||
@SuppressWarnings("unchecked")
|
||||
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
|
||||
(List<Map<String, ?>>) lbConfig.getRawConfigValue().get("childPolicy"));
|
||||
assertThat(childConfigs).hasSize(1);
|
||||
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo(CUSTOM_POLICY_NAME);
|
||||
assertThat(childConfigs.get(0).getRawConfigValue().get(CUSTOM_POLICY_FIELD_KEY)).isEqualTo(
|
||||
CUSTOM_POLICY_FIELD_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxRecursion() {
|
||||
Cluster cluster = Cluster.newBuilder()
|
||||
.setLoadBalancingPolicy(
|
||||
LoadBalancingPolicy.newBuilder().addPolicies(
|
||||
buildWrrPolicy( // Wheee...
|
||||
buildWrrPolicy( // ...eee...
|
||||
buildWrrPolicy( // ...eee!
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
buildWrrPolicy(
|
||||
ROUND_ROBIN_POLICY))))))))))))))))))).build();
|
||||
|
||||
try {
|
||||
LoadBalancerConfigFactory.newConfig(cluster, false);
|
||||
} catch (ResourceInvalidException e) {
|
||||
assertThat(e).hasMessageThat().contains("Maximum LB config recursion depth reached");
|
||||
return;
|
||||
}
|
||||
fail("Expected a ResourceInvalidException because of max recursion exceeded");
|
||||
}
|
||||
|
||||
private static class FakeCustomLoadBalancerProvider extends LoadBalancerProvider {
|
||||
|
||||
@Override
|
||||
public LoadBalancer newLoadBalancer(Helper helper) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPolicyName() {
|
||||
return CUSTOM_POLICY_NAME;
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue