deduplicate addresses within a single EAG (#11342)

This commit is contained in:
Larry Safran 2024-06-27 16:12:24 -07:00 committed by GitHub
parent 603033f1fa
commit 3777c303f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 58 additions and 3 deletions

View File

@ -91,6 +91,9 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
handleNameResolutionError(unavailableStatus);
return unavailableStatus;
}
List<EquivalentAddressGroup> cleanServers = new ArrayList<>();
for (EquivalentAddressGroup eag : servers) {
if (eag == null) {
Status unavailableStatus = Status.UNAVAILABLE.withDescription(
@ -100,6 +103,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
handleNameResolutionError(unavailableStatus);
return unavailableStatus;
}
cleanServers.add(removeDuplicateAddresses(eag));
}
// Since we have a new set of addresses, we are again at first pass
@ -112,15 +116,14 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
PickFirstLeafLoadBalancerConfig config
= (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config.shuffleAddressList != null && config.shuffleAddressList) {
servers = new ArrayList<>(servers);
Collections.shuffle(servers,
Collections.shuffle(cleanServers,
config.randomSeed != null ? new Random(config.randomSeed) : new Random());
}
}
// Make sure we're storing our own list rather than what was passed in
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
ImmutableList.<EquivalentAddressGroup>builder().addAll(servers).build();
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
if (addressIndex == null) {
addressIndex = new Index(newImmutableAddressGroups);
@ -178,6 +181,23 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
return Status.OK;
}
private static EquivalentAddressGroup removeDuplicateAddresses(EquivalentAddressGroup eag) {
Set<SocketAddress> addressSet = new HashSet<>();
ArrayList<SocketAddress> addrs = new ArrayList<>(); // maintains order
for (SocketAddress address : eag.getAddresses()) {
if (addressSet.add(address)) {
addrs.add(address);
}
}
if (addressSet.size() == eag.getAddresses().size()) {
return eag;
}
return new EquivalentAddressGroup(addrs, eag.getAttributes());
}
@Override
public void handleNameResolutionError(Status error) {
if (rawConnectivityState == SHUTDOWN) {

View File

@ -564,6 +564,41 @@ public class PickFirstLeafLoadBalancerTest {
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}
@Test
public void pickAWithDupAddressesUpDownUp() {
InOrder inOrder = inOrder(mockHelper);
SocketAddress socketAddress = servers.get(0).getAddresses().get(0);
EquivalentAddressGroup badEag = new EquivalentAddressGroup(
Lists.newArrayList(socketAddress, socketAddress), affinity);
List<EquivalentAddressGroup> newServers = Lists.newArrayList(badEag);
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
reset(mockHelper);
// An error has happened.
Status error = Status.UNAVAILABLE.withDescription("boom!");
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
inOrder.verify(mockHelper).refreshNameResolution();
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
// Transition from CONNECTING to READY .
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}
@Test
public void nameResolutionError() {
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");