diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index adaf4b4d67..c3f9a52404 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -91,9 +91,6 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { handleNameResolutionError(unavailableStatus); return unavailableStatus; } - - List cleanServers = new ArrayList<>(); - for (EquivalentAddressGroup eag : servers) { if (eag == null) { Status unavailableStatus = Status.UNAVAILABLE.withDescription( @@ -103,12 +100,13 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { handleNameResolutionError(unavailableStatus); return unavailableStatus; } - cleanServers.add(removeDuplicateAddresses(eag)); } // Since we have a new set of addresses, we are again at first pass firstPass = true; + List cleanServers = deDupAddresses(servers); + // We can optionally be configured to shuffle the address list. This can help better distribute // the load. if (resolvedAddresses.getLoadBalancingPolicyConfig() @@ -121,7 +119,6 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } } - // Make sure we're storing our own list rather than what was passed in final ImmutableList newImmutableAddressGroups = ImmutableList.builder().addAll(cleanServers).build(); @@ -181,21 +178,23 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return Status.OK; } - private static EquivalentAddressGroup removeDuplicateAddresses(EquivalentAddressGroup eag) { - Set addressSet = new HashSet<>(); - ArrayList addrs = new ArrayList<>(); // maintains order + private static List deDupAddresses(List groups) { + Set seenAddresses = new HashSet<>(); + List newGroups = new ArrayList<>(); - for (SocketAddress address : eag.getAddresses()) { - if (addressSet.add(address)) { - addrs.add(address); + for (EquivalentAddressGroup group : groups) { + List addrs = new ArrayList<>(); + for (SocketAddress addr : group.getAddresses()) { + if (seenAddresses.add(addr)) { + addrs.add(addr); + } + } + if (!addrs.isEmpty()) { + newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes())); } } - if (addressSet.size() == eag.getAddresses().size()) { - return eag; - } - - return new EquivalentAddressGroup(addrs, eag.getAttributes()); + return newGroups; } @Override diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java index 8816163bad..92178ccae2 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java @@ -36,7 +36,7 @@ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { public static final String GRPC_PF_USE_HAPPY_EYEBALLS = "GRPC_PF_USE_HAPPY_EYEBALLS"; private static final String SHUFFLE_ADDRESS_LIST_KEY = "shuffleAddressList"; - static boolean enableNewPickFirst = + private static boolean enableNewPickFirst = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false); public static boolean isEnabledHappyEyeballs() { @@ -44,11 +44,6 @@ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { return GrpcUtil.getFlag(GRPC_PF_USE_HAPPY_EYEBALLS, false); } - @VisibleForTesting - public static boolean isEnableNewPickFirst() { - return enableNewPickFirst; - } - @Override public boolean isAvailable() { return true; diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index ff2e918b65..8d56968737 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -99,7 +99,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { new FakeLoadBalancerProvider("test_lb2", testLbBalancer2, nextParsedConfigOrError2))); private final Class pfLbClass = - PickFirstLoadBalancerProvider.enableNewPickFirst + PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? PickFirstLeafLoadBalancer.class : PickFirstLoadBalancer.class; diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index 8d701098be..7aae0c2731 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -565,7 +565,7 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void pickAWithDupAddressesUpDownUp() { + public void pickWithDupAddressesUpDownUp() { InOrder inOrder = inOrder(mockHelper); SocketAddress socketAddress = servers.get(0).getAddresses().get(0); EquivalentAddressGroup badEag = new EquivalentAddressGroup( @@ -599,6 +599,38 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); } + @Test + public void pickWithDupEagsUpDownUp() { + InOrder inOrder = inOrder(mockHelper); + List newServers = Lists.newArrayList(servers.get(0), servers.get(0)); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + + reset(mockHelper); + + // An error has happened. + Status error = Status.UNAVAILABLE.withDescription("boom!"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // Transition from TRANSIENT_ERROR to CONNECTING should also be ignored. + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verifyNoMoreInteractions(mockHelper); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // Transition from CONNECTING to READY . + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + } + @Test public void nameResolutionError() { Status error = Status.NOT_FOUND.withDescription("nameResolutionError");