mirror of https://github.com/grpc/grpc-java.git
xds: eds reuse priority names for the same existing locality (#9287)
This commit is contained in:
parent
c2d33f15be
commit
dc8954d442
|
|
@ -58,9 +58,12 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.Nullable;
|
||||
|
|
@ -342,6 +345,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
private final class EdsClusterState extends ClusterState implements EdsResourceWatcher {
|
||||
@Nullable
|
||||
private final String edsServiceName;
|
||||
private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
|
||||
int priorityNameGenId = 1;
|
||||
|
||||
private EdsClusterState(String name, @Nullable String edsServiceName,
|
||||
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
|
||||
|
|
@ -385,10 +390,10 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
List<DropOverload> dropOverloads = update.dropPolicies;
|
||||
List<EquivalentAddressGroup> addresses = new ArrayList<>();
|
||||
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
|
||||
List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
|
||||
for (Locality locality : localityLbEndpoints.keySet()) {
|
||||
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
|
||||
int priority = localityLbInfo.priority();
|
||||
String priorityName = priorityName(name, priority);
|
||||
String priorityName = localityPriorityNames.get(locality);
|
||||
boolean discard = true;
|
||||
for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
|
||||
if (endpoint.isHealthy()) {
|
||||
|
|
@ -426,16 +431,15 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
logger.log(XdsLogLevel.INFO,
|
||||
"Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
|
||||
}
|
||||
List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet());
|
||||
Collections.sort(priorities);
|
||||
sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
|
||||
Map<String, PriorityChildConfig> priorityChildConfigs =
|
||||
generateEdsBasedPriorityChildConfigs(
|
||||
name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
|
||||
endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
|
||||
status = Status.OK;
|
||||
resolved = true;
|
||||
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities,
|
||||
localityWeights);
|
||||
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
|
||||
sortedPriorityNames, localityWeights);
|
||||
handleEndpointResourceUpdate();
|
||||
}
|
||||
}
|
||||
|
|
@ -443,6 +447,40 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
syncContext.execute(new EndpointsUpdated());
|
||||
}
|
||||
|
||||
private List<String> generatePriorityNames(String name,
|
||||
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
|
||||
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
|
||||
for (Locality locality : localityLbEndpoints.keySet()) {
|
||||
int priority = localityLbEndpoints.get(locality).priority();
|
||||
if (!todo.containsKey(priority)) {
|
||||
todo.put(priority, new ArrayList<>());
|
||||
}
|
||||
todo.get(priority).add(locality);
|
||||
}
|
||||
Map<Locality, String> newNames = new HashMap<>();
|
||||
Set<String> usedNames = new HashSet<>();
|
||||
List<String> ret = new ArrayList<>();
|
||||
for (Integer priority: todo.keySet()) {
|
||||
String foundName = "";
|
||||
for (Locality locality : todo.get(priority)) {
|
||||
if (localityPriorityNames.containsKey(locality)
|
||||
&& usedNames.add(localityPriorityNames.get(locality))) {
|
||||
foundName = localityPriorityNames.get(locality);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ("".equals(foundName)) {
|
||||
foundName = String.format("%s[child%d]", name, priorityNameGenId++);
|
||||
}
|
||||
for (Locality locality : todo.get(priority)) {
|
||||
newNames.put(locality, foundName);
|
||||
}
|
||||
ret.add(foundName);
|
||||
}
|
||||
localityPriorityNames = newNames;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceDoesNotExist(final String resourceName) {
|
||||
syncContext.execute(new Runnable() {
|
||||
|
|
@ -718,7 +756,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer {
|
|||
* The ordering is undefined for priorities in different clusters.
|
||||
*/
|
||||
private static String priorityName(String cluster, int priority) {
|
||||
return cluster + "[priority" + priority + "]";
|
||||
return cluster + "[child" + priority + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -257,7 +257,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
.isEqualTo(50 * 60);
|
||||
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
|
||||
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
|
||||
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
|
||||
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]");
|
||||
PriorityChildConfig priorityChildConfig =
|
||||
Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
|
||||
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
|
||||
|
|
@ -298,7 +298,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(addr.getAddresses()).isEqualTo(endpoint.getAddresses());
|
||||
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
|
||||
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
|
||||
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
|
||||
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]");
|
||||
PriorityChildConfig priorityChildConfig =
|
||||
Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
|
||||
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
|
||||
|
|
@ -345,9 +345,9 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true)),
|
||||
20 /* localityWeight */, 2 /* priority */);
|
||||
String priority1 = CLUSTER2 + "[priority1]";
|
||||
String priority2 = CLUSTER2 + "[priority2]";
|
||||
String priority3 = CLUSTER1 + "[priority1]";
|
||||
String priority1 = CLUSTER2 + "[child1]";
|
||||
String priority2 = CLUSTER2 + "[child2]";
|
||||
String priority3 = CLUSTER1 + "[child1]";
|
||||
|
||||
// CLUSTER2: locality1 with priority 1 and locality3 with priority 2.
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
|
|
@ -416,6 +416,81 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(localityWeights).containsEntry(locality3, 20);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void verifyEdsPriorityNames(List<String> want,
|
||||
Map<Locality, LocalityLbEndpoints>... updates) {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
Arrays.asList(edsDiscoveryMechanism2), roundRobin);
|
||||
deliverLbConfig(config);
|
||||
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
|
||||
assertThat(childBalancers).isEmpty();
|
||||
|
||||
for (Map<Locality, LocalityLbEndpoints> update: updates) {
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME2,
|
||||
update);
|
||||
}
|
||||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
|
||||
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
|
||||
assertThat(priorityLbConfig.priorities).isEqualTo(want);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void edsUpdatePriorityName_twoPriorities() {
|
||||
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child1]", CLUSTER2 + "[child2]"),
|
||||
ImmutableMap.of(locality1, createEndpoints(1),
|
||||
locality2, createEndpoints(2)
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void edsUpdatePriorityName_addOnePriority() {
|
||||
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]"),
|
||||
ImmutableMap.of(locality1, createEndpoints(1)),
|
||||
ImmutableMap.of(locality2, createEndpoints(1)
|
||||
));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void edsUpdatePriorityName_swapTwoPriorities() {
|
||||
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]", CLUSTER2 + "[child1]",
|
||||
CLUSTER2 + "[child3]"),
|
||||
ImmutableMap.of(locality1, createEndpoints(1),
|
||||
locality2, createEndpoints(2),
|
||||
locality3, createEndpoints(3)
|
||||
),
|
||||
ImmutableMap.of(locality1, createEndpoints(2),
|
||||
locality2, createEndpoints(1),
|
||||
locality3, createEndpoints(3))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void edsUpdatePriorityName_mergeTwoPriorities() {
|
||||
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child3]", CLUSTER2 + "[child1]"),
|
||||
ImmutableMap.of(locality1, createEndpoints(1),
|
||||
locality3, createEndpoints(3),
|
||||
locality2, createEndpoints(2)),
|
||||
ImmutableMap.of(locality1, createEndpoints(2),
|
||||
locality3, createEndpoints(1),
|
||||
locality2, createEndpoints(1)
|
||||
));
|
||||
}
|
||||
|
||||
private LocalityLbEndpoints createEndpoints(int priority) {
|
||||
return LocalityLbEndpoints.create(
|
||||
Arrays.asList(
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true),
|
||||
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true)),
|
||||
70 /* localityWeight */, priority /* priority */);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
|
||||
ClusterResolverConfig config = new ClusterResolverConfig(
|
||||
|
|
@ -534,7 +609,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
LocalityLbEndpoints.create(
|
||||
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */)),
|
||||
10 /* localityWeight */, 2 /* priority */);
|
||||
String priority2 = CLUSTER1 + "[priority2]";
|
||||
String priority2 = CLUSTER1 + "[child2]";
|
||||
xdsClient.deliverClusterLoadAssignment(
|
||||
EDS_SERVICE_NAME1,
|
||||
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
|
||||
|
|
@ -719,14 +794,14 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
|
||||
.containsExactly(CLUSTER1 + "[priority1]", CLUSTER_DNS + "[priority0]").inOrder();
|
||||
.containsExactly(CLUSTER1 + "[child1]", CLUSTER_DNS + "[child0]").inOrder();
|
||||
assertAddressesEqual(Arrays.asList(endpoint3, endpoint1, endpoint2),
|
||||
childBalancer.addresses); // ordered by cluster then addresses
|
||||
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
|
||||
childBalancer.addresses, CLUSTER1 + "[priority1]"), locality1.toString()),
|
||||
childBalancer.addresses, CLUSTER1 + "[child1]"), locality1.toString()),
|
||||
Collections.singletonList(endpoint3));
|
||||
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
|
||||
childBalancer.addresses, CLUSTER_DNS + "[priority0]"),
|
||||
childBalancer.addresses, CLUSTER_DNS + "[child0]"),
|
||||
Locality.create("", "", "").toString()),
|
||||
Arrays.asList(endpoint1, endpoint2));
|
||||
}
|
||||
|
|
@ -751,7 +826,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
String priority = Iterables.getOnlyElement(
|
||||
((PriorityLbConfig) childBalancer.config).priorities);
|
||||
assertThat(priority).isEqualTo(CLUSTER_DNS + "[priority0]");
|
||||
assertThat(priority).isEqualTo(CLUSTER_DNS + "[child0]");
|
||||
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
|
||||
}
|
||||
|
||||
|
|
@ -775,7 +850,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
|
||||
.containsExactly(CLUSTER1 + "[priority1]");
|
||||
.containsExactly(CLUSTER1 + "[child1]");
|
||||
assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses);
|
||||
assertThat(childBalancer.shutdown).isFalse();
|
||||
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
|
||||
|
|
@ -899,7 +974,7 @@ public class ClusterResolverLoadBalancerTest {
|
|||
assertThat(childBalancers).hasSize(1);
|
||||
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
|
||||
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
|
||||
.containsExactly(CLUSTER1 + "[priority1]", CLUSTER_DNS + "[priority0]");
|
||||
.containsExactly(CLUSTER1 + "[child1]", CLUSTER_DNS + "[child0]");
|
||||
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
|
||||
|
||||
loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable"));
|
||||
|
|
|
|||
Loading…
Reference in New Issue