diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 2dcc51ad8a..2eada2ee07 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -25,19 +25,19 @@ import io.grpc.InternalLogId; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; -import io.grpc.xds.LeastRequestLoadBalancer.LeastRequestConfig; -import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayDeque; @@ -185,22 +185,27 @@ final class CdsLoadBalancer2 extends LoadBalancer { helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); return; } - LoadBalancerProvider lbProvider = null; - Object lbConfig = null; - if (root.result.lbPolicy() == LbPolicy.RING_HASH) { - lbProvider = lbRegistry.getProvider("ring_hash_experimental"); - lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize()); - } - if (root.result.lbPolicy() == LbPolicy.LEAST_REQUEST) { - lbProvider = lbRegistry.getProvider("least_request_experimental"); - lbConfig = new LeastRequestConfig(root.result.choiceCount()); - } + + // The LB policy config is provided in service_config.proto/JSON format. It is unwrapped + // to determine the name of the policy in the load balancer registry. + LbConfig unwrappedLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( + root.result.lbPolicyConfig()); + LoadBalancerProvider lbProvider = lbRegistry.getProvider(unwrappedLbConfig.getPolicyName()); if (lbProvider == null) { - lbProvider = lbRegistry.getProvider("round_robin"); - lbConfig = null; + throw NameResolver.ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription( + "No provider available for LB: " + unwrappedLbConfig.getPolicyName())).getError() + .asRuntimeException(); } + NameResolver.ConfigOrError configOrError = lbProvider.parseLoadBalancingPolicyConfig( + unwrappedLbConfig.getRawConfigValue()); + if (configOrError.getError() != null) { + throw configOrError.getError().augmentDescription("Unable to parse the LB config") + .asRuntimeException(); + } + ClusterResolverConfig config = new ClusterResolverConfig( - Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig)); + Collections.unmodifiableList(instances), + new PolicySelection(lbProvider, configOrError.getConfig())); if (childLb == null) { childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); } diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index d6a83374a5..3202bb703a 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -43,9 +43,6 @@ import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.core.v3.SocketAddress; @@ -70,12 +67,16 @@ import io.grpc.Context; import io.grpc.EquivalentAddressGroup; import io.grpc.Grpc; import io.grpc.InternalLogId; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.TimeProvider; import io.grpc.xds.AbstractXdsClient.ResourceType; import io.grpc.xds.Bootstrapper.AuthorityInfo; @@ -137,14 +138,6 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; private static final String TRANSPORT_SOCKET_NAME_TLS = "envoy.transport_sockets.tls"; @VisibleForTesting - static final long DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE = 1024L; - @VisibleForTesting - static final long DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE = 8 * 1024 * 1024L; - @VisibleForTesting - static final int DEFAULT_LEAST_REQUEST_CHOICE_COUNT = 2; - @VisibleForTesting - static final long MAX_RING_HASH_LB_POLICY_RING_SIZE = 8 * 1024 * 1024L; - @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; @VisibleForTesting static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id"; @@ -211,6 +204,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res } }); private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry(); + private final LoadBalancerRegistry loadBalancerRegistry + = LoadBalancerRegistry.getDefaultRegistry(); private final Map serverChannelMap = new HashMap<>(); private final Map ldsResourceSubscribers = new HashMap<>(); private final Map rdsResourceSubscribers = new HashMap<>(); @@ -1598,8 +1593,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res if (getBootstrapInfo() != null && getBootstrapInfo().certProviders() != null) { certProviderInstances = getBootstrapInfo().certProviders().keySet(); } - cdsUpdate = - processCluster(cluster, retainedEdsResources, certProviderInstances, serverInfo); + cdsUpdate = processCluster(cluster, retainedEdsResources, certProviderInstances, serverInfo, + loadBalancerRegistry); } catch (ResourceInvalidException e) { errors.add( "CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage()); @@ -1618,7 +1613,8 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res @VisibleForTesting static CdsUpdate processCluster(Cluster cluster, Set retainedEdsResources, - Set certProviderInstances, ServerInfo serverInfo) + Set certProviderInstances, ServerInfo serverInfo, + LoadBalancerRegistry loadBalancerRegistry) throws ResourceInvalidException { StructOrError structOrError; switch (cluster.getClusterDiscoveryTypeCase()) { @@ -1639,41 +1635,22 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res } CdsUpdate.Builder updateBuilder = structOrError.getStruct(); - if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { - RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); - long minRingSize = - lbConfig.hasMinimumRingSize() - ? lbConfig.getMinimumRingSize().getValue() - : DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE; - long maxRingSize = - lbConfig.hasMaximumRingSize() - ? lbConfig.getMaximumRingSize().getValue() - : DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE; - if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH - || minRingSize > maxRingSize - || maxRingSize > MAX_RING_HASH_LB_POLICY_RING_SIZE) { - throw new ResourceInvalidException( - "Cluster " + cluster.getName() + ": invalid ring_hash_lb_config: " + lbConfig); - } - updateBuilder.ringHashLbPolicy(minRingSize, maxRingSize); - } else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) { - updateBuilder.roundRobinLbPolicy(); - } else if (enableLeastRequest && cluster.getLbPolicy() == LbPolicy.LEAST_REQUEST) { - LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig(); - int choiceCount = - lbConfig.hasChoiceCount() - ? lbConfig.getChoiceCount().getValue() - : DEFAULT_LEAST_REQUEST_CHOICE_COUNT; - if (choiceCount < DEFAULT_LEAST_REQUEST_CHOICE_COUNT) { - throw new ResourceInvalidException( - "Cluster " + cluster.getName() + ": invalid least_request_lb_config: " + lbConfig); - } - updateBuilder.leastRequestLbPolicy(choiceCount); - } else { - throw new ResourceInvalidException( - "Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy()); + // TODO: If load_balancing_policy is set in Cluster use it for LB config, otherwise fall back + // to using the legacy lb_policy field. + ImmutableMap lbPolicyConfig = LegacyLoadBalancerConfigFactory.newConfig(cluster, + enableLeastRequest); + + // Validate the LB config by trying to parse it with the corresponding LB provider. + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(lbPolicyConfig); + NameResolver.ConfigOrError configOrError = loadBalancerRegistry.getProvider( + lbConfig.getPolicyName()).parseLoadBalancingPolicyConfig( + lbConfig.getRawConfigValue()); + if (configOrError.getError() != null) { + throw new ResourceInvalidException(structOrError.getErrorDetail()); } + updateBuilder.lbPolicyConfig(lbPolicyConfig); + return updateBuilder.build(); } @@ -2580,11 +2557,11 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res static final class ResourceInvalidException extends Exception { private static final long serialVersionUID = 0L; - private ResourceInvalidException(String message) { + ResourceInvalidException(String message) { super(message, null, false, false); } - private ResourceInvalidException(String message, Throwable cause) { + ResourceInvalidException(String message, Throwable cause) { super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false); } } diff --git a/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java new file mode 100644 index 0000000000..45a73b1e1f --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java @@ -0,0 +1,105 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.google.common.collect.ImmutableMap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.grpc.xds.ClientXdsClient.ResourceInvalidException; + +/** + * Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message. The + * lb_policy field is used to select the policy and configuration is extracted from various policy + * specific fields in Cluster. + */ +abstract class LegacyLoadBalancerConfigFactory { + + static final String ROUND_ROBIN_FIELD_NAME = "round_robin"; + + static final String RING_HASH_FIELD_NAME = "ring_hash_experimental"; + static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize"; + static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize"; + + static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental"; + static final String CHOICE_COUNT_FIELD_NAME = "choiceCount"; + + /** + * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link + * Cluster}. + * + * @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration. + */ + static ImmutableMap newConfig(Cluster cluster, boolean enableLeastRequest) + throws ResourceInvalidException { + switch (cluster.getLbPolicy()) { + case ROUND_ROBIN: + return newRoundRobinConfig(); + case RING_HASH: + return newRingHashConfig(cluster); + case LEAST_REQUEST: + if (enableLeastRequest) { + return newLeastRequestConfig(cluster); + } + break; + default: + } + throw new ResourceInvalidException( + "Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy()); + } + + // Builds an empty configuration for round robin (it is not configurable). + private static ImmutableMap newRoundRobinConfig() { + return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of()); + } + + // Builds a ring hash config and validates the hash function selection. + private static ImmutableMap newRingHashConfig(Cluster cluster) + throws ResourceInvalidException { + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + + // The hash function needs to be validated here as it is not exposed in the returned + // configuration for later validation. + if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) { + throw new ResourceInvalidException( + "Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig); + } + + ImmutableMap.Builder configBuilder = ImmutableMap.builder(); + if (lbConfig.hasMinimumRingSize()) { + configBuilder.put(MIN_RING_SIZE_FIELD_NAME, + ((Long) lbConfig.getMinimumRingSize().getValue()).doubleValue()); + } + if (lbConfig.hasMaximumRingSize()) { + configBuilder.put(MAX_RING_SIZE_FIELD_NAME, + ((Long) lbConfig.getMaximumRingSize().getValue()).doubleValue()); + } + return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.build()); + } + + // Builds a new least request config. + private static ImmutableMap newLeastRequestConfig(Cluster cluster) { + LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig(); + + ImmutableMap.Builder configBuilder = ImmutableMap.builder(); + if (lbConfig.hasChoiceCount()) { + configBuilder.put(CHOICE_COUNT_FIELD_NAME, + ((Integer) lbConfig.getChoiceCount().getValue()).doubleValue()); + } + return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.build()); + } +} diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 1124466e25..bdffc36119 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -24,6 +24,7 @@ import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.net.UrlEscapers; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Any; @@ -178,8 +179,7 @@ abstract class XdsClient { abstract ClusterType clusterType(); - // Endpoint-level load balancing policy. - abstract LbPolicy lbPolicy(); + abstract ImmutableMap lbPolicyConfig(); // Only valid if lbPolicy is "ring_hash_experimental". abstract long minRingSize(); @@ -276,7 +276,7 @@ abstract class XdsClient { return MoreObjects.toStringHelper(this) .add("clusterName", clusterName()) .add("clusterType", clusterType()) - .add("lbPolicy", lbPolicy()) + .add("lbPolicyConfig", lbPolicyConfig()) .add("minRingSize", minRingSize()) .add("maxRingSize", maxRingSize()) .add("choiceCount", choiceCount()) @@ -297,19 +297,21 @@ abstract class XdsClient { // Private, use one of the static factory methods instead. protected abstract Builder clusterType(ClusterType clusterType); - // Private, use roundRobinLbPolicy() or ringHashLbPolicy(long, long). - protected abstract Builder lbPolicy(LbPolicy lbPolicy); + protected abstract Builder lbPolicyConfig(ImmutableMap lbPolicyConfig); Builder roundRobinLbPolicy() { - return this.lbPolicy(LbPolicy.ROUND_ROBIN); + return this.lbPolicyConfig(ImmutableMap.of("round_robin", ImmutableMap.of())); } - Builder ringHashLbPolicy(long minRingSize, long maxRingSize) { - return this.lbPolicy(LbPolicy.RING_HASH).minRingSize(minRingSize).maxRingSize(maxRingSize); + Builder ringHashLbPolicy(Long minRingSize, Long maxRingSize) { + return this.lbPolicyConfig(ImmutableMap.of("ring_hash_experimental", + ImmutableMap.of("minRingSize", minRingSize.doubleValue(), "maxRingSize", + maxRingSize.doubleValue()))); } - Builder leastRequestLbPolicy(int choiceCount) { - return this.lbPolicy(LbPolicy.LEAST_REQUEST).choiceCount(choiceCount); + Builder leastRequestLbPolicy(Integer choiceCount) { + return this.lbPolicyConfig(ImmutableMap.of("least_request_experimental", + ImmutableMap.of("choiceCount", choiceCount.doubleValue()))); } // Private, use leastRequestLbPolicy(int). diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index cd1d33077a..66972a0915 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -18,6 +18,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; import io.grpc.ConnectivityState; @@ -39,6 +41,7 @@ import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; +import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; @@ -87,7 +90,8 @@ public class CdsLoadBalancer2Test { new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { - throw new AssertionError(e); + throw new RuntimeException(e); + //throw new AssertionError(e); } }); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); @@ -121,8 +125,10 @@ public class CdsLoadBalancer2Test { when(helper.getSynchronizationContext()).thenReturn(syncContext); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider("round_robin")); - lbRegistry.register(new FakeLoadBalancerProvider("ring_hash_experimental")); - lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental")); + lbRegistry.register( + new FakeLoadBalancerProvider("ring_hash_experimental", new RingHashLoadBalancerProvider())); + lbRegistry.register(new FakeLoadBalancerProvider("least_request_experimental", + new LeastRequestLoadBalancerProvider())); loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -513,6 +519,34 @@ public class CdsLoadBalancer2Test { any(ConnectivityState.class), any(SubchannelPicker.class)); } + @Test + public void unknownLbProvider() { + try { + xdsClient.deliverCdsUpdate(CLUSTER, + CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext) + .lbPolicyConfig(ImmutableMap.of("unknown", ImmutableMap.of("foo", "bar"))).build()); + } catch (Exception e) { + assertThat(e).hasCauseThat().hasMessageThat().contains("No provider available"); + return; + } + fail("Expected the unknown LB to cause an exception"); + } + + @Test + public void invalidLbConfig() { + try { + xdsClient.deliverCdsUpdate(CLUSTER, + CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, 100L, upstreamTlsContext) + .lbPolicyConfig( + ImmutableMap.of("ring_hash_experimental", ImmutableMap.of("minRingSize", "-1"))) + .build()); + } catch (Exception e) { + assertThat(e).hasCauseThat().hasMessageThat().contains("Unable to parse"); + return; + } + fail("Expected the invalid config to casue an exception"); + } + private static void assertPicker(SubchannelPicker picker, Status expectedStatus, @Nullable Subchannel expectedSubchannel) { PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); @@ -539,9 +573,15 @@ public class CdsLoadBalancer2Test { private final class FakeLoadBalancerProvider extends LoadBalancerProvider { private final String policyName; + private final LoadBalancerProvider configParsingDelegate; FakeLoadBalancerProvider(String policyName) { + this(policyName, null); + } + + FakeLoadBalancerProvider(String policyName, LoadBalancerProvider configParsingDelegate) { this.policyName = policyName; + this.configParsingDelegate = configParsingDelegate; } @Override @@ -565,6 +605,15 @@ public class CdsLoadBalancer2Test { public String getPolicyName() { return policyName; } + + @Override + public NameResolver.ConfigOrError parseLoadBalancingPolicyConfig( + Map rawLoadBalancingPolicyConfig) { + if (configParsingDelegate != null) { + return configParsingDelegate.parseLoadBalancingPolicyConfig(rawLoadBalancingPolicyConfig); + } + return super.parseLoadBalancingPolicyConfig(rawLoadBalancingPolicyConfig); + } } private final class FakeLoadBalancer extends LoadBalancer { diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index c0df1c48b5..0fbb503b87 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -30,7 +30,6 @@ import com.google.protobuf.Message; import com.google.protobuf.StringValue; import com.google.protobuf.Struct; import com.google.protobuf.UInt32Value; -import com.google.protobuf.UInt64Value; import com.google.protobuf.Value; import com.google.protobuf.util.Durations; import com.google.re2j.Pattern; @@ -38,9 +37,6 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.CidrRange; @@ -106,7 +102,10 @@ import io.envoyproxy.envoy.type.v3.Int64Range; import io.grpc.ClientInterceptor; import io.grpc.InsecureChannelCredentials; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerRegistry; import io.grpc.Status.Code; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.lookup.v1.GrpcKeyBuilder; import io.grpc.lookup.v1.GrpcKeyBuilder.Name; import io.grpc.lookup.v1.NameMatcher; @@ -1734,12 +1733,10 @@ public class ClientXdsClientDataTest { .build(); CdsUpdate update = ClientXdsClient.processCluster( - cluster, new HashSet(), null, LRS_SERVER_INFO); - assertThat(update.lbPolicy()).isEqualTo(CdsUpdate.LbPolicy.RING_HASH); - assertThat(update.minRingSize()) - .isEqualTo(ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE); - assertThat(update.maxRingSize()) - .isEqualTo(ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE); + cluster, new HashSet(), null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental"); } @Test @@ -1758,10 +1755,10 @@ public class ClientXdsClientDataTest { .build(); CdsUpdate update = ClientXdsClient.processCluster( - cluster, new HashSet(), null, LRS_SERVER_INFO); - assertThat(update.lbPolicy()).isEqualTo(CdsUpdate.LbPolicy.LEAST_REQUEST); - assertThat(update.choiceCount()) - .isEqualTo(ClientXdsClient.DEFAULT_LEAST_REQUEST_CHOICE_COUNT); + cluster, new HashSet(), null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); } @Test @@ -1783,84 +1780,8 @@ public class ClientXdsClientDataTest { thrown.expect(ResourceInvalidException.class); thrown.expectMessage( "Cluster cluster-foo.googleapis.com: transport-socket-matches not supported."); - ClientXdsClient.processCluster(cluster, new HashSet(), null, LRS_SERVER_INFO); - } - - @Test - public void parseCluster_ringHashLbPolicy_invalidRingSizeConfig_minGreaterThanMax() - throws ResourceInvalidException { - Cluster cluster = Cluster.newBuilder() - .setName("cluster-foo.googleapis.com") - .setType(DiscoveryType.EDS) - .setEdsClusterConfig( - EdsClusterConfig.newBuilder() - .setEdsConfig( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance())) - .setServiceName("service-foo.googleapis.com")) - .setLbPolicy(LbPolicy.RING_HASH) - .setRingHashLbConfig( - RingHashLbConfig.newBuilder() - .setHashFunction(HashFunction.XX_HASH) - .setMinimumRingSize(UInt64Value.newBuilder().setValue(1000L)) - .setMaximumRingSize(UInt64Value.newBuilder().setValue(100L))) - .build(); - - thrown.expect(ResourceInvalidException.class); - thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid ring_hash_lb_config"); - ClientXdsClient.processCluster(cluster, new HashSet(), null, LRS_SERVER_INFO); - } - - @Test - public void parseCluster_ringHashLbPolicy_invalidRingSizeConfig_tooLargeRingSize() - throws ResourceInvalidException { - Cluster cluster = Cluster.newBuilder() - .setName("cluster-foo.googleapis.com") - .setType(DiscoveryType.EDS) - .setEdsClusterConfig( - EdsClusterConfig.newBuilder() - .setEdsConfig( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance())) - .setServiceName("service-foo.googleapis.com")) - .setLbPolicy(LbPolicy.RING_HASH) - .setRingHashLbConfig( - RingHashLbConfig.newBuilder() - .setHashFunction(HashFunction.XX_HASH) - .setMinimumRingSize(UInt64Value.newBuilder().setValue(1000L)) - .setMaximumRingSize( - UInt64Value.newBuilder() - .setValue(ClientXdsClient.MAX_RING_HASH_LB_POLICY_RING_SIZE + 1))) - .build(); - - thrown.expect(ResourceInvalidException.class); - thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid ring_hash_lb_config"); - ClientXdsClient.processCluster(cluster, new HashSet(), null, LRS_SERVER_INFO); - } - - @Test - public void parseCluster_leastRequestLbPolicy_invalidChoiceCountConfig_tooSmallChoiceCount() - throws ResourceInvalidException { - ClientXdsClient.enableLeastRequest = true; - Cluster cluster = Cluster.newBuilder() - .setName("cluster-foo.googleapis.com") - .setType(DiscoveryType.EDS) - .setEdsClusterConfig( - EdsClusterConfig.newBuilder() - .setEdsConfig( - ConfigSource.newBuilder() - .setAds(AggregatedConfigSource.getDefaultInstance())) - .setServiceName("service-foo.googleapis.com")) - .setLbPolicy(LbPolicy.LEAST_REQUEST) - .setLeastRequestLbConfig( - LeastRequestLbConfig.newBuilder() - .setChoiceCount(UInt32Value.newBuilder().setValue(1)) - ) - .build(); - - thrown.expect(ResourceInvalidException.class); - thrown.expectMessage("Cluster cluster-foo.googleapis.com: invalid least_request_lb_config"); - ClientXdsClient.processCluster(cluster, new HashSet(), null, LRS_SERVER_INFO); + ClientXdsClient.processCluster(cluster, new HashSet(), null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); } @Test @@ -1877,7 +1798,8 @@ public class ClientXdsClientDataTest { .setServiceName("service-foo.googleapis.com")) .setLbPolicy(LbPolicy.ROUND_ROBIN) .build(); - ClientXdsClient.processCluster(cluster1, retainedEdsResources, null, LRS_SERVER_INFO); + ClientXdsClient.processCluster(cluster1, retainedEdsResources, null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); Cluster cluster2 = Cluster.newBuilder() .setName("cluster-foo.googleapis.com") @@ -1890,7 +1812,8 @@ public class ClientXdsClientDataTest { .setServiceName("service-foo.googleapis.com")) .setLbPolicy(LbPolicy.ROUND_ROBIN) .build(); - ClientXdsClient.processCluster(cluster2, retainedEdsResources, null, LRS_SERVER_INFO); + ClientXdsClient.processCluster(cluster2, retainedEdsResources, null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); Cluster cluster3 = Cluster.newBuilder() .setName("cluster-foo.googleapis.com") @@ -1908,7 +1831,8 @@ public class ClientXdsClientDataTest { thrown.expectMessage( "Cluster cluster-foo.googleapis.com: field eds_cluster_config must be set to indicate to" + " use EDS over ADS or self ConfigSource"); - ClientXdsClient.processCluster(cluster3, retainedEdsResources, null, LRS_SERVER_INFO); + ClientXdsClient.processCluster(cluster3, retainedEdsResources, null, LRS_SERVER_INFO, + LoadBalancerRegistry.getDefaultRegistry()); } @Test diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 6a8481f536..bd662b1da0 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -23,6 +23,7 @@ import static io.grpc.xds.AbstractXdsClient.ResourceType.EDS; import static io.grpc.xds.AbstractXdsClient.ResourceType.LDS; import static io.grpc.xds.AbstractXdsClient.ResourceType.RDS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -56,6 +57,9 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.FakeClock.TaskFilter; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.TimeProvider; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.AbstractXdsClient.ResourceType; @@ -73,7 +77,6 @@ import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy; import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.LdsResourceWatcher; @@ -1625,7 +1628,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1647,7 +1651,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1674,8 +1679,9 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.LEAST_REQUEST); - assertThat(cdsUpdate.choiceCount()).isEqualTo(3); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); + assertThat(lbConfig.getRawConfigValue().get("choiceCount")).isEqualTo(3); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1701,9 +1707,12 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.RING_HASH); - assertThat(cdsUpdate.minRingSize()).isEqualTo(10L); - assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental"); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo( + 10L); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo( + 100L); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1727,7 +1736,8 @@ public abstract class ClientXdsClientTestBase { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); @@ -1748,7 +1758,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1892,7 +1903,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1934,7 +1946,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1953,7 +1966,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1961,6 +1975,53 @@ public abstract class ClientXdsClientTestBase { verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); } + // Assures that CDS updates identical to the current config are ignored. + @Test + public void cdsResourceUpdatedWithDuplicate() { + DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); + + String edsService = "eds-service-bar.googleapis.com"; + String transportSocketName = "envoy.transport_sockets.tls"; + Any roundRobinConfig = Any.pack( + mf.buildEdsCluster(CDS_RESOURCE, edsService, "round_robin", null, null, true, null, + transportSocketName, null + )); + Any ringHashConfig = Any.pack( + mf.buildEdsCluster(CDS_RESOURCE, edsService, "ring_hash_experimental", + mf.buildRingHashLbConfig("xx_hash", 1, 2), null, true, null, + transportSocketName, null + )); + Any leastRequestConfig = Any.pack( + mf.buildEdsCluster(CDS_RESOURCE, edsService, "least_request_experimental", + null, mf.buildLeastRequestLbConfig(2), true, null, + transportSocketName, null + )); + + // Configure with round robin, the update should be sent to the watcher. + call.sendResponse(CDS, roundRobinConfig, VERSION_2, "0001"); + verify(cdsResourceWatcher, times(1)).onChanged(isA(CdsUpdate.class)); + + // Second update is identical, watcher should not get an additional update. + call.sendResponse(CDS, roundRobinConfig, VERSION_2, "0002"); + verify(cdsResourceWatcher, times(1)).onChanged(isA(CdsUpdate.class)); + + // Now we switch to ring hash so the watcher should be notified. + call.sendResponse(CDS, ringHashConfig, VERSION_2, "0003"); + verify(cdsResourceWatcher, times(2)).onChanged(isA(CdsUpdate.class)); + + // Second update to ring hash should not result in watcher being notified. + call.sendResponse(CDS, ringHashConfig, VERSION_2, "0004"); + verify(cdsResourceWatcher, times(2)).onChanged(isA(CdsUpdate.class)); + + // Now we switch to least request so the watcher should be notified. + call.sendResponse(CDS, leastRequestConfig, VERSION_2, "0005"); + verify(cdsResourceWatcher, times(3)).onChanged(isA(CdsUpdate.class)); + + // Second update to least request should not result in watcher being notified. + call.sendResponse(CDS, leastRequestConfig, VERSION_2, "0006"); + verify(cdsResourceWatcher, times(3)).onChanged(isA(CdsUpdate.class)); + } + @Test public void cdsResourceDeleted() { DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); @@ -1974,7 +2035,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2026,7 +2088,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2035,7 +2098,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2044,7 +2108,8 @@ public abstract class ClientXdsClientTestBase { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); + assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) + .getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java new file mode 100644 index 0000000000..f90a181a18 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.xds.ClientXdsClient.ResourceInvalidException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit test for {@link LegacyLoadBalancerConfigFactory}. + */ +@RunWith(JUnit4.class) +public class LegacyLoadBalancerConfigFactoryTest { + + @Test + public void roundRobin() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build(); + + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( + LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); + + assertThat(lbConfig.getPolicyName()).isEqualTo("round_robin"); + assertThat(lbConfig.getRawConfigValue()).isEmpty(); + } + + @Test + public void ringHash() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig( + RingHashLbConfig.newBuilder() + .setMinimumRingSize(UInt64Value.newBuilder().setValue(1).build()) + .setMaximumRingSize(UInt64Value.newBuilder().setValue(2).build()).build()).build(); + + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( + LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); + + assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental"); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(1); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(2); + } + + @Test + public void ringHash_invalidHash() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig( + RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build(); + + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("invalid ring hash function"); + return; + } + fail("ResourceInvalidException not thrown"); + } + + @Test + public void leastRequest() throws ResourceInvalidException { + System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true"); + + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST) + .setLeastRequestLbConfig(LeastRequestLbConfig.newBuilder() + .setChoiceCount(UInt32Value.newBuilder().setValue(10).build())).build(); + + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( + LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); + + assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "choiceCount")).isEqualTo(10); + } + + + @Test + public void leastRequest_notEnabled() throws ResourceInvalidException { + System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false"); + + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build(); + + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LegacyLoadBalancerConfigFactory.newConfig(cluster, false)); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("unsupported lb policy"); + return; + } + fail("ResourceInvalidException not thrown"); + } +}