xds: implement new CDS LB policy for supporting aggregate clusters (#7722)

The new CDS LB policy supports discovering aggregate clusters and logical DNS clusters. If the cluster given by its config is an aggregate cluster, it recursively resolves into a list of underlying clusters. It creates a cluster_resolver LB policy as its child policy that receives a list of DiscoveryMechanisms with each containing cluster-level configurations for a single non-aggregate cluster.
This commit is contained in:
Chengyuan Zhang 2021-01-14 13:17:55 -08:00 committed by GitHub
parent ffcc360ba9
commit a584baf86a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 914 additions and 0 deletions

View File

@ -0,0 +1,328 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import javax.annotation.Nullable;
/**
* Load balancer for cds_experimental LB policy. One instance per top-level cluster.
* The top-level cluster may be a plain EDS/logical-DNS cluster or an aggregate cluster formed
* by a group of sub-clusters in a tree hierarchy.
*/
final class CdsLoadBalancer2 extends LoadBalancer {
private final XdsLogger logger;
private final Helper helper;
private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry;
// Following fields are effectively final.
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
private CdsLbState cdsLbState;
private ResolvedAddresses resolvedAddresses;
CdsLoadBalancer2(Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry());
}
@VisibleForTesting
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (this.resolvedAddresses != null) {
return;
}
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
this.resolvedAddresses = resolvedAddresses;
xdsClientPool = resolvedAddresses.getAttributes().get(XdsAttributes.XDS_CLIENT_POOL);
xdsClient = xdsClientPool.getObject();
CdsConfig config = (CdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
logger.log(XdsLogLevel.INFO, "Config: {0}", config);
cdsLbState = new CdsLbState(config.name);
cdsLbState.start();
}
@Override
public void handleNameResolutionError(Status error) {
logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
if (cdsLbState != null && cdsLbState.childLb != null) {
cdsLbState.childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
if (cdsLbState != null) {
cdsLbState.shutdown();
}
if (xdsClientPool != null) {
xdsClientPool.returnObject(xdsClient);
}
}
/**
* The state of a CDS working session of {@link CdsLoadBalancer2}. Created and started when
* receiving the CDS LB policy config with the top-level cluster name.
*/
private final class CdsLbState {
private final ClusterState root;
private LoadBalancer childLb;
private CdsLbState(String rootCluster) {
root = new ClusterState(rootCluster);
}
private void start() {
root.start();
}
private void shutdown() {
root.shutdown();
}
private void handleClusterDiscovered() {
List<DiscoveryMechanism> instances = new ArrayList<>();
// Level-order traversal.
// Collect configurations for all non-aggregate (leaf) clusters.
Queue<ClusterState> queue = new ArrayDeque<>();
queue.add(root);
while (!queue.isEmpty()) {
int size = queue.size();
for (int i = 0; i < size; i++) {
ClusterState clusterState = queue.remove();
if (!clusterState.discovered) {
return; // do not proceed until all clusters discovered
}
if (clusterState.result == null) { // resource revoked or not exists
continue;
}
if (clusterState.isLeaf) {
DiscoveryMechanism instance;
if (clusterState.result instanceof EdsClusterConfig) {
EdsClusterConfig clusterConfig = (EdsClusterConfig) clusterState.result;
instance = DiscoveryMechanism.forEds(clusterState.name, clusterConfig.edsServiceName,
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests,
clusterConfig.upstreamTlsContext);
} else { // logical DNS
LogicalDnsClusterConfig clusterConfig =
(LogicalDnsClusterConfig) clusterState.result;
instance = DiscoveryMechanism.forLogicalDns(clusterState.name,
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests,
clusterConfig.upstreamTlsContext);
}
instances.add(instance);
} else {
if (clusterState.childClusterStates != null) {
queue.addAll(clusterState.childClusterStates.values());
}
}
}
}
if (instances.isEmpty()) { // none of non-aggregate clusters exists
if (childLb != null) {
childLb.shutdown();
childLb = null;
}
Status unavailable =
Status.UNAVAILABLE.withDescription("Cluster " + root.name + " unusable");
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return;
}
String endpointPickingPolicy = root.result.lbPolicy;
LoadBalancerProvider localityPickingLbProvider =
lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded
LoadBalancerProvider endpointPickingLbProvider =
lbRegistry.getProvider(endpointPickingPolicy);
ClusterResolverConfig config = new ClusterResolverConfig(
Collections.unmodifiableList(instances),
new PolicySelection(localityPickingLbProvider, null /* by cluster_resolver LB policy */),
new PolicySelection(endpointPickingLbProvider, null));
if (childLb == null) {
childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper);
}
childLb.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build());
}
private void handleClusterDiscoveryError(Status error) {
if (childLb != null) {
childLb.handleNameResolutionError(error);
} else {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
private final class ClusterState implements CdsResourceWatcher {
private final String name;
@Nullable
private Map<String, ClusterState> childClusterStates;
@Nullable
private ClusterConfig result;
// Following fields are effectively final.
private boolean isLeaf;
private boolean discovered;
private boolean shutdown;
private ClusterState(String name) {
this.name = name;
}
private void start() {
xdsClient.watchCdsResource(name, this);
}
void shutdown() {
shutdown = true;
xdsClient.cancelCdsResourceWatch(name, this);
if (childClusterStates != null) { // recursively shut down all descendants
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
}
}
@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
// All watchers should receive the same error, so we only propagate it once.
if (ClusterState.this == root) {
handleClusterDiscoveryError(error);
}
}
});
}
@Override
public void onResourceDoesNotExist(String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
discovered = true;
result = null;
if (childClusterStates != null) {
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
}
childClusterStates = null;
}
handleClusterDiscovered();
}
});
}
@Override
public void onChanged(final CdsUpdate update) {
class ClusterDiscovered implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
discovered = true;
result = update.clusterConfig;
if (update.clusterType == ClusterType.AGGREGATE) {
isLeaf = false;
AggregateClusterConfig clusterConfig = (AggregateClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}", update.clusterName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : clusterConfig.prioritizedClusterNames) {
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState = new ClusterState(cluster);
childState.start();
newChildStates.put(cluster, childState);
} else {
newChildStates.put(cluster, childClusterStates.remove(cluster));
}
}
if (childClusterStates != null) { // stop subscribing to revoked child clusters
for (ClusterState watcher : childClusterStates.values()) {
watcher.shutdown();
}
}
childClusterStates = newChildStates;
} else if (update.clusterType == ClusterType.EDS) {
isLeaf = true;
EdsClusterConfig clusterConfig = (EdsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName, clusterConfig.edsServiceName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
} else { // logical DNS
isLeaf = true;
LogicalDnsClusterConfig clusterConfig =
(LogicalDnsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
}
handleClusterDiscovered();
}
}
syncContext.execute(new ClusterDiscovered());
}
}
}
}

View File

@ -0,0 +1,586 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME;
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Tests for {@link CdsLoadBalancer2}.
*/
@RunWith(JUnit4.class)
public class CdsLoadBalancer2Test {
private static final String CLUSTER = "cluster-foo.googleapis.com"; // cluster of entry point
private static final String EDS_SERVICE_NAME = "backend-service-1.googleapis.com";
private static final String LRS_SERVER_NAME = "lrs.googleapis.com";
private final UpstreamTlsContext upstreamTlsContext =
CommonTlsContextTestsUtil.buildUpstreamTlsContextFromFilenames(
CommonTlsContextTestsUtil.CLIENT_KEY_FILE,
CommonTlsContextTestsUtil.CLIENT_PEM_FILE,
CommonTlsContextTestsUtil.CA_PEM_FILE);
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
private final List<FakeLoadBalancer> childBalancers = new ArrayList<>();
private final FakeXdsClient xdsClient = new FakeXdsClient();
private final ObjectPool<XdsClient> xdsClientPool = new ObjectPool<XdsClient>() {
@Override
public XdsClient getObject() {
xdsClientRefs++;
return xdsClient;
}
@Override
public XdsClient returnObject(Object object) {
xdsClientRefs--;
return null;
}
};
@Mock
private Helper helper;
@Captor
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
private int xdsClientRefs;
private CdsLoadBalancer2 loadBalancer;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(helper.getSynchronizationContext()).thenReturn(syncContext);
lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME));
lbRegistry.register(new FakeLoadBalancerProvider("round_robin"));
loadBalancer = new CdsLoadBalancer2(helper, lbRegistry);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(
// Other attributes not used by cluster_resolver LB are omitted.
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool).build())
.setLoadBalancingPolicyConfig(new CdsConfig(CLUSTER))
.build());
assertThat(Iterables.getOnlyElement(xdsClient.watchers.keySet())).isEqualTo(CLUSTER);
}
@After
public void tearDown() {
loadBalancer.shutdown();
assertThat(xdsClient.watchers).isEmpty();
assertThat(xdsClientRefs).isEqualTo(0);
}
@Test
public void discoverTopLevelEdsCluster() {
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
upstreamTlsContext);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
LRS_SERVER_NAME, 100L, upstreamTlsContext);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
}
@Test
public void discoverTopLevelLogicalDnsCluster() {
xdsClient.deliverLogicalDnsCluster(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null,
LRS_SERVER_NAME, 100L, upstreamTlsContext);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
}
@Test
public void nonAggregateCluster_resourceNotExist_returnErrorPicker() {
xdsClient.deliverResourceNotExist(CLUSTER);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancers).isEmpty();
}
@Test
public void nonAggregateCluster_resourceUpdate() {
xdsClient.deliverEdsCluster(CLUSTER, null, null, 100L, upstreamTlsContext);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, 100L,
upstreamTlsContext);
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null);
childLbConfig = (ClusterResolverConfig) childBalancer.config;
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
LRS_SERVER_NAME, 200L, null);
}
@Test
public void nonAggregateCluster_resourceRevoked() {
xdsClient.deliverLogicalDnsCluster(CLUSTER, null, 100L, upstreamTlsContext);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, null,
100L, upstreamTlsContext);
xdsClient.deliverResourceNotExist(CLUSTER);
assertThat(childBalancer.shutdown).isTrue();
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancer.shutdown).isTrue();
assertThat(childBalancers).isEmpty();
}
@Test
public void discoveryAggregateCluster() {
String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
assertThat(childBalancers).isEmpty();
String cluster3 = "cluster-03.googleapis.com";
String cluster4 = "cluster-04.googleapis.com";
// cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)]
xdsClient.deliverAggregateCluster(cluster1, Arrays.asList(cluster3, cluster4));
assertThat(xdsClient.watchers.keySet()).containsExactly(
CLUSTER, cluster1, cluster2, cluster3, cluster4);
assertThat(childBalancers).isEmpty();
xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
upstreamTlsContext);
assertThat(childBalancers).isEmpty();
xdsClient.deliverLogicalDnsCluster(cluster2, null, 100L, null);
assertThat(childBalancers).isEmpty();
xdsClient.deliverEdsCluster(cluster4, null, LRS_SERVER_NAME, 300L,
null);
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(3);
// Clusters on higher level has higher priority: [cluster2, cluster3, cluster4]
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster2,
DiscoveryMechanism.Type.LOGICAL_DNS, null, null, 100L, null);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster3,
DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4,
DiscoveryMechanism.Type.EDS, null, LRS_SERVER_NAME, 300L, null);
assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName())
.isEqualTo(WEIGHTED_TARGET_POLICY_NAME);
assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
}
@Test
public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() {
String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS)]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
xdsClient.deliverResourceNotExist(cluster1);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancers).isEmpty();
}
@Test
public void aggregateCluster_descendantClustersRevoked() {
String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null);
xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
upstreamTlsContext);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1,
DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2,
DiscoveryMechanism.Type.LOGICAL_DNS, null, LRS_SERVER_NAME, 100L, null);
// Revoke cluster1, should still be able to proceed with cluster2.
xdsClient.deliverResourceNotExist(cluster1);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
assertDiscoveryMechanism(Iterables.getOnlyElement(childLbConfig.discoveryMechanisms), cluster2,
DiscoveryMechanism.Type.LOGICAL_DNS, null, LRS_SERVER_NAME, 100L, null);
verify(helper, never()).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), any(SubchannelPicker.class));
// All revoked.
xdsClient.deliverResourceNotExist(cluster2);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancer.shutdown).isTrue();
assertThat(childBalancers).isEmpty();
}
@Test
public void aggregateCluster_rootClusterRevoked() {
String cluster1 = "cluster-01.googleapis.com";
String cluster2 = "cluster-02.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null);
xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L,
upstreamTlsContext);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(2);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(0), cluster1,
DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext);
assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(1), cluster2,
DiscoveryMechanism.Type.LOGICAL_DNS, null, LRS_SERVER_NAME, 100L, null);
xdsClient.deliverResourceNotExist(CLUSTER);
assertThat(xdsClient.watchers.keySet())
.containsExactly(CLUSTER); // subscription to all descendant clusters cancelled
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancer.shutdown).isTrue();
assertThat(childBalancers).isEmpty();
}
@Test
public void aggregateCluster_intermediateClusterChanges() {
String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
// CLUSTER (aggr.) -> [cluster2 (aggr.)]
String cluster2 = "cluster-02.googleapis.com";
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster2));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2);
// cluster2 (aggr.) -> [cluster3 (EDS)]
String cluster3 = "cluster-03.googleapis.com";
xdsClient.deliverAggregateCluster(cluster2, Collections.singletonList(cluster3));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
upstreamTlsContext);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
assertDiscoveryMechanism(instance, cluster3, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME,
LRS_SERVER_NAME, 100L, upstreamTlsContext);
// cluster2 revoked
xdsClient.deliverResourceNotExist(cluster2);
assertThat(xdsClient.watchers.keySet())
.containsExactly(CLUSTER, cluster2); // cancelled subscription to cluster3
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status unavailable = Status.UNAVAILABLE.withDescription("Cluster " + CLUSTER + " unusable");
assertPicker(pickerCaptor.getValue(), unavailable, null);
assertThat(childBalancer.shutdown).isTrue();
assertThat(childBalancers).isEmpty();
}
@Test
public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() {
String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
xdsClient.deliverError(error);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), error, null);
assertThat(childBalancers).isEmpty();
}
@Test
public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() {
String cluster1 = "cluster-01.googleapis.com";
// CLUSTER (aggr.) -> [cluster1 (logical DNS)]
xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1));
xdsClient.deliverLogicalDnsCluster(cluster1, LRS_SERVER_NAME, 200L, null);
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config;
assertThat(childLbConfig.discoveryMechanisms).hasSize(1);
Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
xdsClient.deliverError(error);
assertThat(childLb.upstreamError).isEqualTo(error);
assertThat(childLb.shutdown).isFalse(); // child LB may choose to keep working
}
@Test
public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() {
Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable");
loadBalancer.handleNameResolutionError(upstreamError);
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
assertPicker(pickerCaptor.getValue(), upstreamError, null);
}
@Test
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L,
upstreamTlsContext);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.shutdown).isFalse();
loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable"));
assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(childBalancer.upstreamError.getDescription()).isEqualTo("unreachable");
verify(helper, never()).updateBalancingState(
any(ConnectivityState.class), any(SubchannelPicker.class));
}
private static void assertPicker(SubchannelPicker picker, Status expectedStatus,
@Nullable Subchannel expectedSubchannel) {
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
Status actualStatus = result.getStatus();
assertThat(actualStatus.getCode()).isEqualTo(expectedStatus.getCode());
assertThat(actualStatus.getDescription()).isEqualTo(expectedStatus.getDescription());
if (actualStatus.isOk()) {
assertThat(result.getSubchannel()).isSameInstanceAs(expectedSubchannel);
}
}
private static void assertDiscoveryMechanism(DiscoveryMechanism instance, String name,
DiscoveryMechanism.Type type, @Nullable String edsServiceName,
@Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) {
assertThat(instance.cluster).isEqualTo(name);
assertThat(instance.type).isEqualTo(type);
assertThat(instance.edsServiceName).isEqualTo(edsServiceName);
assertThat(instance.lrsServerName).isEqualTo(lrsServerName);
assertThat(instance.maxConcurrentRequests).isEqualTo(maxConcurrentRequests);
assertThat(instance.tlsContext).isEqualTo(tlsContext);
}
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
private final String policyName;
FakeLoadBalancerProvider(String policyName) {
this.policyName = policyName;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
FakeLoadBalancer balancer = new FakeLoadBalancer(policyName, helper);
childBalancers.add(balancer);
return balancer;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 0; // doesn't matter
}
@Override
public String getPolicyName() {
return policyName;
}
}
private final class FakeLoadBalancer extends LoadBalancer {
private final String name;
private final Helper helper;
private Object config;
private Status upstreamError;
private boolean shutdown;
FakeLoadBalancer(String name, Helper helper) {
this.name = name;
this.helper = helper;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
config = resolvedAddresses.getLoadBalancingPolicyConfig();
}
@Override
public void handleNameResolutionError(Status error) {
upstreamError = error;
}
@Override
public void shutdown() {
shutdown = true;
childBalancers.remove(this);
}
void deliverSubchannelState(final Subchannel subchannel, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
helper.updateBalancingState(state, picker);
}
}
private static final class FakeXdsClient extends XdsClient {
private final Map<String, CdsResourceWatcher> watchers = new HashMap<>();
@Override
void watchCdsResource(String resourceName, CdsResourceWatcher watcher) {
assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, watcher);
}
@Override
void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) {
assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName);
}
private void deliverEdsCluster(String clusterName, @Nullable String edsServiceName,
@Nullable String lrsServerName, @Nullable Long maxConcurrentRequests,
@Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) {
EdsClusterConfig clusterConfig = new EdsClusterConfig("round_robin", edsServiceName,
lrsServerName, maxConcurrentRequests, tlsContext);
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.EDS, clusterConfig);
watchers.get(clusterName).onChanged(update);
}
}
private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName,
@Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) {
if (watchers.containsKey(clusterName)) {
LogicalDnsClusterConfig clusterConfig = new LogicalDnsClusterConfig("round_robin",
lrsServerName, maxConcurrentRequests, tlsContext);
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, clusterConfig);
watchers.get(clusterName).onChanged(update);
}
}
private void deliverAggregateCluster(String clusterName, List<String> clusters) {
if (watchers.containsKey(clusterName)) {
AggregateClusterConfig clusterConfig = new AggregateClusterConfig("round_robin", clusters);
CdsUpdate update = new CdsUpdate(clusterName, ClusterType.AGGREGATE, clusterConfig);
watchers.get(clusterName).onChanged(update);
}
}
private void deliverResourceNotExist(String clusterName) {
if (watchers.containsKey(clusterName)) {
watchers.get(clusterName).onResourceDoesNotExist(clusterName);
}
}
private void deliverError(Status error) {
for (CdsResourceWatcher watcher : watchers.values()) {
watcher.onError(error);
}
}
}
}