xds: ClientXdsClient to provide JSON LB configurations (v2) (#9095)

This refactoring is done in preparation of a larger change where LB configuration will be provided in the xDS Cluster proto message load_balancing_policy field. This field will allow for the configuration of custom LB policies with arbitrary configuration data.

- Instead of directly creating Java configuration objects, the client delegates to a new factory class to generate JSON configurations
- This factory is considered a "legacy" one as a separate factory will be introduced to build configs based on the new load_balancing_policy field
- The client will use a LoadBalancerProvider to parse the generated config to assure it is valid.
- Overlapping LB config validation that exists both in ClientXdsClient and LB providers will be removed from the client.

This is a second attempt at #8996 that was reverted by #9092.

The initial PR was reverted because the change caused the duplicate CDS update detection in ClientXdsClient to fail. This was because equality checking of PolicySelection instances cannot be relied on. This PR uses the JSON config instead - CdsLoadBalancer2 will handle the conversion from JSON config to PolicySelection.
This commit is contained in:
Terry Wilson 2022-04-21 14:18:42 -07:00 committed by GitHub
parent a5829107c3
commit 8e65700edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 432 additions and 190 deletions

View File

@ -25,19 +25,19 @@ import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
@ -185,22 +185,27 @@ final class CdsLoadBalancer2 extends LoadBalancer {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return;
}
LoadBalancerProvider lbProvider = null;
Object lbConfig = null;
if (root.result.lbPolicy() == LbPolicy.RING_HASH) {
lbProvider = lbRegistry.getProvider("ring_hash_experimental");
lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize());
}
if (root.result.lbPolicy() == LbPolicy.LEAST_REQUEST) {
lbProvider = lbRegistry.getProvider("least_request_experimental");
lbConfig = new LeastRequestConfig(root.result.choiceCount());
}
// The LB policy config is provided in service_config.proto/JSON format. It is unwrapped
// to determine the name of the policy in the load balancer registry.
LbConfig unwrappedLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
root.result.lbPolicyConfig());
LoadBalancerProvider lbProvider = lbRegistry.getProvider(unwrappedLbConfig.getPolicyName());
if (lbProvider == null) {
lbProvider = lbRegistry.getProvider("round_robin");
lbConfig = null;
throw NameResolver.ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription(
"No provider available for LB: " + unwrappedLbConfig.getPolicyName())).getError()
.asRuntimeException();
}
NameResolver.ConfigOrError configOrError = lbProvider.parseLoadBalancingPolicyConfig(
unwrappedLbConfig.getRawConfigValue());
if (configOrError.getError() != null) {
throw configOrError.getError().augmentDescription("Unable to parse the LB config")
.asRuntimeException();
}
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig));
Collections.unmodifiableList(instances),
new PolicySelection(lbProvider, configOrError.getConfig()));
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}

View File

@ -43,9 +43,6 @@ import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions;
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.core.v3.SocketAddress;
@ -70,12 +67,16 @@ import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Grpc;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
@ -137,14 +138,6 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
private static final String TRANSPORT_SOCKET_NAME_TLS = "envoy.transport_sockets.tls";
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE = 1024L;
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final int DEFAULT_LEAST_REQUEST_CHOICE_COUNT = 2;
@VisibleForTesting
static final long MAX_RING_HASH_LB_POLICY_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
@ -211,6 +204,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
}
});
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private final LoadBalancerRegistry loadBalancerRegistry
= LoadBalancerRegistry.getDefaultRegistry();
private final Map<ServerInfo, AbstractXdsClient> serverChannelMap = new HashMap<>();
private final Map<String, ResourceSubscriber> ldsResourceSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> rdsResourceSubscribers = new HashMap<>();
@ -1598,8 +1593,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
if (getBootstrapInfo() != null && getBootstrapInfo().certProviders() != null) {
certProviderInstances = getBootstrapInfo().certProviders().keySet();
}
cdsUpdate =
processCluster(cluster, retainedEdsResources, certProviderInstances, serverInfo);
cdsUpdate = processCluster(cluster, retainedEdsResources, certProviderInstances, serverInfo,
loadBalancerRegistry);
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
@ -1618,7 +1613,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
@VisibleForTesting
static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEdsResources,
Set<String> certProviderInstances, ServerInfo serverInfo)
Set<String> certProviderInstances, ServerInfo serverInfo,
LoadBalancerRegistry loadBalancerRegistry)
throws ResourceInvalidException {
StructOrError<CdsUpdate.Builder> structOrError;
switch (cluster.getClusterDiscoveryTypeCase()) {
@ -1639,41 +1635,22 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
}
CdsUpdate.Builder updateBuilder = structOrError.getStruct();
if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
long minRingSize =
lbConfig.hasMinimumRingSize()
? lbConfig.getMinimumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE;
long maxRingSize =
lbConfig.hasMaximumRingSize()
? lbConfig.getMaximumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE;
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH
|| minRingSize > maxRingSize
|| maxRingSize > MAX_RING_HASH_LB_POLICY_RING_SIZE) {
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": invalid ring_hash_lb_config: " + lbConfig);
}
updateBuilder.ringHashLbPolicy(minRingSize, maxRingSize);
} else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
updateBuilder.roundRobinLbPolicy();
} else if (enableLeastRequest && cluster.getLbPolicy() == LbPolicy.LEAST_REQUEST) {
LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig();
int choiceCount =
lbConfig.hasChoiceCount()
? lbConfig.getChoiceCount().getValue()
: DEFAULT_LEAST_REQUEST_CHOICE_COUNT;
if (choiceCount < DEFAULT_LEAST_REQUEST_CHOICE_COUNT) {
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": invalid least_request_lb_config: " + lbConfig);
}
updateBuilder.leastRequestLbPolicy(choiceCount);
} else {
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
// TODO: If load_balancing_policy is set in Cluster use it for LB config, otherwise fall back
// to using the legacy lb_policy field.
ImmutableMap<String, ?> lbPolicyConfig = LegacyLoadBalancerConfigFactory.newConfig(cluster,
enableLeastRequest);
// Validate the LB config by trying to parse it with the corresponding LB provider.
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(lbPolicyConfig);
NameResolver.ConfigOrError configOrError = loadBalancerRegistry.getProvider(
lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig(
lbConfig.getRawConfigValue());
if (configOrError.getError() != null) {
throw new ResourceInvalidException(structOrError.getErrorDetail());
}
updateBuilder.lbPolicyConfig(lbPolicyConfig);
return updateBuilder.build();
}
@ -2580,11 +2557,11 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
static final class ResourceInvalidException extends Exception {
private static final long serialVersionUID = 0L;
private ResourceInvalidException(String message) {
ResourceInvalidException(String message) {
super(message, null, false, false);
}
private ResourceInvalidException(String message, Throwable cause) {
ResourceInvalidException(String message, Throwable cause) {
super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import com.google.common.collect.ImmutableMap;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
/**
* Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message. The
* lb_policy field is used to select the policy and configuration is extracted from various policy
* specific fields in Cluster.
*/
abstract class LegacyLoadBalancerConfigFactory {
static final String ROUND_ROBIN_FIELD_NAME = "round_robin";
static final String RING_HASH_FIELD_NAME = "ring_hash_experimental";
static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize";
static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize";
static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental";
static final String CHOICE_COUNT_FIELD_NAME = "choiceCount";
/**
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
* Cluster}.
*
* @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration.
*/
static ImmutableMap<String, ?> newConfig(Cluster cluster, boolean enableLeastRequest)
throws ResourceInvalidException {
switch (cluster.getLbPolicy()) {
case ROUND_ROBIN:
return newRoundRobinConfig();
case RING_HASH:
return newRingHashConfig(cluster);
case LEAST_REQUEST:
if (enableLeastRequest) {
return newLeastRequestConfig(cluster);
}
break;
default:
}
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
}
// Builds an empty configuration for round robin (it is not configurable).
private static ImmutableMap<String, ?> newRoundRobinConfig() {
return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of());
}
// Builds a ring hash config and validates the hash function selection.
private static ImmutableMap<String, ?> newRingHashConfig(Cluster cluster)
throws ResourceInvalidException {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
// The hash function needs to be validated here as it is not exposed in the returned
// configuration for later validation.
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig);
}
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (lbConfig.hasMinimumRingSize()) {
configBuilder.put(MIN_RING_SIZE_FIELD_NAME,
((Long) lbConfig.getMinimumRingSize().getValue()).doubleValue());
}
if (lbConfig.hasMaximumRingSize()) {
configBuilder.put(MAX_RING_SIZE_FIELD_NAME,
((Long) lbConfig.getMaximumRingSize().getValue()).doubleValue());
}
return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.build());
}
// Builds a new least request config.
private static ImmutableMap<String, ?> newLeastRequestConfig(Cluster cluster) {
LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig();
ImmutableMap.Builder<String, Object> configBuilder = ImmutableMap.builder();
if (lbConfig.hasChoiceCount()) {
configBuilder.put(CHOICE_COUNT_FIELD_NAME,
((Integer) lbConfig.getChoiceCount().getValue()).doubleValue());
}
return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.build());
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
@ -178,8 +179,7 @@ abstract class XdsClient {
abstract ClusterType clusterType();
// Endpoint-level load balancing policy.
abstract LbPolicy lbPolicy();
abstract ImmutableMap<String, ?> lbPolicyConfig();
// Only valid if lbPolicy is "ring_hash_experimental".
abstract long minRingSize();
@ -276,7 +276,7 @@ abstract class XdsClient {
return MoreObjects.toStringHelper(this)
.add("clusterName", clusterName())
.add("clusterType", clusterType())
.add("lbPolicy", lbPolicy())
.add("lbPolicyConfig", lbPolicyConfig())
.add("minRingSize", minRingSize())
.add("maxRingSize", maxRingSize())
.add("choiceCount", choiceCount())
@ -297,19 +297,21 @@ abstract class XdsClient {
// Private, use one of the static factory methods instead.
protected abstract Builder clusterType(ClusterType clusterType);
// Private, use roundRobinLbPolicy() or ringHashLbPolicy(long, long).
protected abstract Builder lbPolicy(LbPolicy lbPolicy);
protected abstract Builder lbPolicyConfig(ImmutableMap<String, ?> lbPolicyConfig);
Builder roundRobinLbPolicy() {
return this.lbPolicy(LbPolicy.ROUND_ROBIN);
return this.lbPolicyConfig(ImmutableMap.of("round_robin", ImmutableMap.of()));
}
Builder ringHashLbPolicy(long minRingSize, long maxRingSize) {
return this.lbPolicy(LbPolicy.RING_HASH).minRingSize(minRingSize).maxRingSize(maxRingSize);
Builder ringHashLbPolicy(Long minRingSize, Long maxRingSize) {
return this.lbPolicyConfig(ImmutableMap.of("ring_hash_experimental",
ImmutableMap.of("minRingSize", minRingSize.doubleValue(), "maxRingSize",
maxRingSize.doubleValue())));
}
Builder leastRequestLbPolicy(int choiceCount) {
return this.lbPolicy(LbPolicy.LEAST_REQUEST).choiceCount(choiceCount);
Builder leastRequestLbPolicy(Integer choiceCount) {
return this.lbPolicyConfig(ImmutableMap.of("least_request_experimental",
ImmutableMap.of("choiceCount", choiceCount.doubleValue())));
}
// Private, use leastRequestLbPolicy(int).

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -25,6 +26,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
@ -39,6 +41,7 @@ import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
@ -87,7 +90,8 @@ public class CdsLoadBalancer2Test {
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
throw new RuntimeException(e);
//throw new AssertionError(e);
}
});
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
@ -121,8 +125,10 @@ public class CdsLoadBalancer2Test {
when(helper.getSynchronizationContext()).thenReturn(syncContext);
lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider("round_robin"));
lbRegistry.register(new FakeLoadBalancerProvider("ring_hash_experimental"));
lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental"));
lbRegistry.register(
new FakeLoadBalancerProvider("ring_hash_experimental", new RingHashLoadBalancerProvider()));
lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental",
new LeastRequestLoadBalancerProvider()));
loadBalancer = new CdsLoadBalancer2(helper, lbRegistry);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
@ -513,6 +519,34 @@ public class CdsLoadBalancer2Test {
any(ConnectivityState.class), any(SubchannelPicker.class));
}
@Test
public void unknownLbProvider() {
try {
xdsClient.deliverCdsUpdate(CLUSTER,
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext)
.lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build());
} catch (Exception e) {
assertThat(e).hasCauseThat().hasMessageThat().contains("No provider available");
return;
}
fail("Expected the unknown LB to cause an exception");
}
@Test
public void invalidLbConfig() {
try {
xdsClient.deliverCdsUpdate(CLUSTER,
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext)
.lbPolicyConfig(
ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1")))
.build());
} catch (Exception e) {
assertThat(e).hasCauseThat().hasMessageThat().contains("Unable to parse");
return;
}
fail("Expected the invalid config to casue an exception");
}
private static void assertPicker(SubchannelPicker picker, Status expectedStatus,
@Nullable Subchannel expectedSubchannel) {
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
@ -539,9 +573,15 @@ public class CdsLoadBalancer2Test {
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
private final LoadBalancerProvider configParsingDelegate;
FakeLoadBalancerProvider(String policyName) {
this(policyName, null);
}
FakeLoadBalancerProvider(String policyName, LoadBalancerProvider configParsingDelegate) {
this.policyName = policyName;
this.configParsingDelegate = configParsingDelegate;
}
@Override
@ -565,6 +605,15 @@ public class CdsLoadBalancer2Test {
public String getPolicyName() {
return policyName;
}
@Override
public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
if (configParsingDelegate != null) {
return configParsingDelegate.parseLoadBalancingPolicyConfig(rawLoadBalancingPolicyConfig);
}
return super.parseLoadBalancingPolicyConfig(rawLoadBalancingPolicyConfig);
}
}
private final class FakeLoadBalancer extends LoadBalancer {

View File

@ -30,7 +30,6 @@ import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.google.protobuf.Struct;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import com.google.re2j.Pattern;
@ -38,9 +37,6 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction;
import io.envoyproxy.envoy.config.core.v3.Address;
import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource;
import io.envoyproxy.envoy.config.core.v3.CidrRange;
@ -106,7 +102,10 @@ import io.envoyproxy.envoy.type.v3.Int64Range;
import io.grpc.ClientInterceptor;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status.Code;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.lookup.v1.GrpcKeyBuilder;
import io.grpc.lookup.v1.GrpcKeyBuilder.Name;
import io.grpc.lookup.v1.NameMatcher;
@ -1734,12 +1733,10 @@ public class ClientXdsClientDataTest {
.build();
CdsUpdate update = ClientXdsClient.processCluster(
cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
assertThat(update.lbPolicy()).isEqualTo(CdsUpdate.LbPolicy.RING_HASH);
assertThat(update.minRingSize())
.isEqualTo(ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE);
assertThat(update.maxRingSize())
.isEqualTo(ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE);
cluster, new HashSet<String>(), null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental");
}
@Test
@ -1758,10 +1755,10 @@ public class ClientXdsClientDataTest {
.build();
CdsUpdate update = ClientXdsClient.processCluster(
cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
assertThat(update.lbPolicy()).isEqualTo(CdsUpdate.LbPolicy.LEAST_REQUEST);
assertThat(update.choiceCount())
.isEqualTo(ClientXdsClient.DEFAULT_LEAST_REQUEST_CHOICE_COUNT);
cluster, new HashSet<String>(), null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
}
@Test
@ -1783,84 +1780,8 @@ public class ClientXdsClientDataTest {
thrown.expect(ResourceInvalidException.class);
thrown.expectMessage(
"Cluster cluster-foo.googleapis.com: transport-socket-matches not supported.");
ClientXdsClient.processCluster(cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
}
@Test
public void parseCluster_ringHashLbPolicy_invalidRingSizeConfig_minGreaterThanMax()
throws ResourceInvalidException {
Cluster cluster = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
.setType(DiscoveryType.EDS)
.setEdsClusterConfig(
EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()))
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.RING_HASH)
.setRingHashLbConfig(
RingHashLbConfig.newBuilder()
.setHashFunction(HashFunction.XX_HASH)
.setMinimumRingSize(UInt64Value.newBuilder().setValue(1000L))
.setMaximumRingSize(UInt64Value.newBuilder().setValue(100L)))
.build();
thrown.expect(ResourceInvalidException.class);
thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid ring_hash_lb_config");
ClientXdsClient.processCluster(cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
}
@Test
public void parseCluster_ringHashLbPolicy_invalidRingSizeConfig_tooLargeRingSize()
throws ResourceInvalidException {
Cluster cluster = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
.setType(DiscoveryType.EDS)
.setEdsClusterConfig(
EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()))
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.RING_HASH)
.setRingHashLbConfig(
RingHashLbConfig.newBuilder()
.setHashFunction(HashFunction.XX_HASH)
.setMinimumRingSize(UInt64Value.newBuilder().setValue(1000L))
.setMaximumRingSize(
UInt64Value.newBuilder()
.setValue(ClientXdsClient.MAX_RING_HASH_LB_POLICY_RING_SIZE + 1)))
.build();
thrown.expect(ResourceInvalidException.class);
thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid ring_hash_lb_config");
ClientXdsClient.processCluster(cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
}
@Test
public void parseCluster_leastRequestLbPolicy_invalidChoiceCountConfig_tooSmallChoiceCount()
throws ResourceInvalidException {
ClientXdsClient.enableLeastRequest = true;
Cluster cluster = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
.setType(DiscoveryType.EDS)
.setEdsClusterConfig(
EdsClusterConfig.newBuilder()
.setEdsConfig(
ConfigSource.newBuilder()
.setAds(AggregatedConfigSource.getDefaultInstance()))
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.LEAST_REQUEST)
.setLeastRequestLbConfig(
LeastRequestLbConfig.newBuilder()
.setChoiceCount(UInt32Value.newBuilder().setValue(1))
)
.build();
thrown.expect(ResourceInvalidException.class);
thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid least_request_lb_config");
ClientXdsClient.processCluster(cluster, new HashSet<String>(), null, LRS_SERVER_INFO);
ClientXdsClient.processCluster(cluster, new HashSet<String>(), null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
}
@Test
@ -1877,7 +1798,8 @@ public class ClientXdsClientDataTest {
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.ROUND_ROBIN)
.build();
ClientXdsClient.processCluster(cluster1, retainedEdsResources, null, LRS_SERVER_INFO);
ClientXdsClient.processCluster(cluster1, retainedEdsResources, null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
Cluster cluster2 = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
@ -1890,7 +1812,8 @@ public class ClientXdsClientDataTest {
.setServiceName("service-foo.googleapis.com"))
.setLbPolicy(LbPolicy.ROUND_ROBIN)
.build();
ClientXdsClient.processCluster(cluster2, retainedEdsResources, null, LRS_SERVER_INFO);
ClientXdsClient.processCluster(cluster2, retainedEdsResources, null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
Cluster cluster3 = Cluster.newBuilder()
.setName("cluster-foo.googleapis.com")
@ -1908,7 +1831,8 @@ public class ClientXdsClientDataTest {
thrown.expectMessage(
"Cluster cluster-foo.googleapis.com: field eds_cluster_config must be set to indicate to"
+ " use EDS over ADS or self ConfigSource");
ClientXdsClient.processCluster(cluster3, retainedEdsResources, null, LRS_SERVER_INFO);
ClientXdsClient.processCluster(cluster3, retainedEdsResources, null, LRS_SERVER_INFO,
LoadBalancerRegistry.getDefaultRegistry());
}
@Test

View File

@ -23,6 +23,7 @@ import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS;
import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -56,6 +57,9 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.FakeClock.ScheduledTask;
import io.grpc.internal.FakeClock.TaskFilter;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.TimeProvider;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
@ -73,7 +77,6 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.XdsClient.EdsResourceWatcher;
import io.grpc.xds.XdsClient.EdsUpdate;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
@ -1625,7 +1628,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1647,7 +1651,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1674,8 +1679,9 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.LEAST_REQUEST);
assertThat(cdsUpdate.choiceCount()).isEqualTo(3);
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
assertThat(lbConfig.getRawConfigValue().get("choiceCount")).isEqualTo(3);
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1701,9 +1707,12 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.RING_HASH);
assertThat(cdsUpdate.minRingSize()).isEqualTo(10L);
assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L);
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental");
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(
10L);
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(
100L);
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1727,7 +1736,8 @@ public abstract class ClientXdsClientTestBase {
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
@ -1748,7 +1758,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L);
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1892,7 +1903,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1934,7 +1946,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1953,7 +1966,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -1961,6 +1975,53 @@ public abstract class ClientXdsClientTestBase {
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}
// Assures that CDS updates identical to the current config are ignored.
@Test
public void cdsResourceUpdatedWithDuplicate() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
String edsService = "eds-service-bar.googleapis.com";
String transportSocketName = "envoy.transport_sockets.tls";
Any roundRobinConfig = Any.pack(
mf.buildEdsCluster(CDS_RESOURCE, edsService, "round_robin", null, null, true, null,
transportSocketName, null
));
Any ringHashConfig = Any.pack(
mf.buildEdsCluster(CDS_RESOURCE, edsService, "ring_hash_experimental",
mf.buildRingHashLbConfig("xx_hash", 1, 2), null, true, null,
transportSocketName, null
));
Any leastRequestConfig = Any.pack(
mf.buildEdsCluster(CDS_RESOURCE, edsService, "least_request_experimental",
null, mf.buildLeastRequestLbConfig(2), true, null,
transportSocketName, null
));
// Configure with round robin, the update should be sent to the watcher.
call.sendResponse(CDS, roundRobinConfig, VERSION_2, "0001");
verify(cdsResourceWatcher, times(1)).onChanged(isA(CdsUpdate.class));
// Second update is identical, watcher should not get an additional update.
call.sendResponse(CDS, roundRobinConfig, VERSION_2, "0002");
verify(cdsResourceWatcher, times(1)).onChanged(isA(CdsUpdate.class));
// Now we switch to ring hash so the watcher should be notified.
call.sendResponse(CDS, ringHashConfig, VERSION_2, "0003");
verify(cdsResourceWatcher, times(2)).onChanged(isA(CdsUpdate.class));
// Second update to ring hash should not result in watcher being notified.
call.sendResponse(CDS, ringHashConfig, VERSION_2, "0004");
verify(cdsResourceWatcher, times(2)).onChanged(isA(CdsUpdate.class));
// Now we switch to least request so the watcher should be notified.
call.sendResponse(CDS, leastRequestConfig, VERSION_2, "0005");
verify(cdsResourceWatcher, times(3)).onChanged(isA(CdsUpdate.class));
// Second update to least request should not result in watcher being notified.
call.sendResponse(CDS, leastRequestConfig, VERSION_2, "0006");
verify(cdsResourceWatcher, times(3)).onChanged(isA(CdsUpdate.class));
}
@Test
public void cdsResourceDeleted() {
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
@ -1974,7 +2035,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -2026,7 +2088,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS);
assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -2035,7 +2098,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
@ -2044,7 +2108,8 @@ public abstract class ClientXdsClientTestBase {
assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService);
assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN);
assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig())
.getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();

View File

@ -0,0 +1,115 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.xds.ClientXdsClient.ResourceInvalidException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Unit test for {@link LegacyLoadBalancerConfigFactory}.
*/
@RunWith(JUnit4.class)
public class LegacyLoadBalancerConfigFactoryTest {
@Test
public void roundRobin() throws ResourceInvalidException {
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
assertThat(lbConfig.getPolicyName()).isEqualTo("round_robin");
assertThat(lbConfig.getRawConfigValue()).isEmpty();
}
@Test
public void ringHash() throws ResourceInvalidException {
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig(
RingHashLbConfig.newBuilder()
.setMinimumRingSize(UInt64Value.newBuilder().setValue(1).build())
.setMaximumRingSize(UInt64Value.newBuilder().setValue(2).build()).build()).build();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental");
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(1);
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(2);
}
@Test
public void ringHash_invalidHash() throws ResourceInvalidException {
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig(
RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build();
try {
ServiceConfigUtil.unwrapLoadBalancingConfig(
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
} catch (ResourceInvalidException e) {
assertThat(e).hasMessageThat().contains("invalid ring hash function");
return;
}
fail("ResourceInvalidException not thrown");
}
@Test
public void leastRequest() throws ResourceInvalidException {
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true");
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST)
.setLeastRequestLbConfig(LeastRequestLbConfig.newBuilder()
.setChoiceCount(UInt32Value.newBuilder().setValue(10).build())).build();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(
LegacyLoadBalancerConfigFactory.newConfig(cluster, true));
assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental");
assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "choiceCount")).isEqualTo(10);
}
@Test
public void leastRequest_notEnabled() throws ResourceInvalidException {
System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false");
Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build();
try {
ServiceConfigUtil.unwrapLoadBalancingConfig(
LegacyLoadBalancerConfigFactory.newConfig(cluster, false));
} catch (ResourceInvalidException e) {
assertThat(e).hasMessageThat().contains("unsupported lb policy");
return;
}
fail("ResourceInvalidException not thrown");
}
}