From 7f4c16e068eeb6c7049b7ab2f0420a72d2cc1fb2 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 29 Jan 2024 12:50:58 -0800 Subject: [PATCH] internal:Happy eyeballs (#10731) * implement happy eyeballs --------- Co-authored-by: tonyjongyoonan --- api/src/main/java/io/grpc/LoadBalancer.java | 5 +- .../main/java/io/grpc/internal/GrpcUtil.java | 15 + .../internal/PickFirstLeafLoadBalancer.java | 255 ++- .../PickFirstLeafLoadBalancerTest.java | 1491 ++++++++++------- 4 files changed, 1129 insertions(+), 637 deletions(-) diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 56727778a7..84f108c419 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -955,6 +955,8 @@ public abstract class LoadBalancer { * *

It must be called from {@link #getSynchronizationContext the Synchronization Context} * + * @return Must return a valid Subchannel object, may not return null. + * * @since 1.22.0 */ public Subchannel createSubchannel(CreateSubchannelArgs args) { @@ -1287,7 +1289,8 @@ public abstract class LoadBalancer { */ public final EquivalentAddressGroup getAddresses() { List groups = getAllAddresses(); - Preconditions.checkState(groups.size() == 1, "%s does not have exactly one group", groups); + Preconditions.checkState(groups != null && groups.size() == 1, + "%s does not have exactly one group", groups); return groups.get(0); } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 34f60c0f24..5bd3447317 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -24,6 +24,7 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; +import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -948,5 +949,19 @@ public final class GrpcUtil { } } + public static boolean getFlag(String envVarName, boolean enableByDefault) { + String envVar = System.getenv(envVarName); + if (envVar == null) { + envVar = System.getProperty(envVarName); + } + if (enableByDefault) { + return Strings.isNullOrEmpty(envVar) || Boolean.parseBoolean(envVar); + } else { + return !Strings.isNullOrEmpty(envVar) && Boolean.parseBoolean(envVar); + } + } + + + private GrpcUtil() {} } diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index d254d4b0ab..3d6fadeffd 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -25,6 +25,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.grpc.Attributes; import io.grpc.ConnectivityState; @@ -33,6 +34,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.ExperimentalApi; import io.grpc.LoadBalancer; import io.grpc.Status; +import io.grpc.SynchronizationContext.ScheduledHandle; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -42,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -55,11 +58,21 @@ import javax.annotation.Nullable; @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10383") final class PickFirstLeafLoadBalancer extends LoadBalancer { private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName()); + @VisibleForTesting + static final int CONNECTION_DELAY_INTERVAL_MS = 250; + public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS = + "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS"; private final Helper helper; private final Map subchannels = new HashMap<>(); private Index addressIndex; + private int numTf = 0; + private boolean firstPass = true; + @Nullable + private ScheduledHandle scheduleConnectionTask; private ConnectivityState rawConnectivityState = IDLE; private ConnectivityState concludedState = IDLE; + private final boolean enableHappyEyeballs = + GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false); PickFirstLeafLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); @@ -67,7 +80,13 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { @Override public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { + if (rawConnectivityState == SHUTDOWN) { + return Status.FAILED_PRECONDITION.withDescription("Already shut down"); + } + List servers = resolvedAddresses.getAddresses(); + + // Validate the address list if (servers.isEmpty()) { Status unavailableStatus = Status.UNAVAILABLE.withDescription( "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses() @@ -85,6 +104,10 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return unavailableStatus; } } + + // Since we have a new set of addresses, we are again at first pass + firstPass = true; + // We can optionally be configured to shuffle the address list. This can help better distribute // the load. if (resolvedAddresses.getLoadBalancingPolicyConfig() @@ -92,47 +115,45 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { PickFirstLeafLoadBalancerConfig config = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); if (config.shuffleAddressList != null && config.shuffleAddressList) { - servers = new ArrayList(servers); + servers = new ArrayList<>(servers); Collections.shuffle(servers, config.randomSeed != null ? new Random(config.randomSeed) : new Random()); } } - final List newImmutableAddressGroups = - Collections.unmodifiableList(new ArrayList<>(servers)); + // Make sure we're storing our own list rather than what was passed in + final ImmutableList newImmutableAddressGroups = + ImmutableList.builder().addAll(servers).build(); if (addressIndex == null) { addressIndex = new Index(newImmutableAddressGroups); } else if (rawConnectivityState == READY) { - // If a ready subchannel exists in new address list, + // If the previous ready subchannel exists in new address list, // keep this connection and don't create new subchannels SocketAddress previousAddress = addressIndex.getCurrentAddress(); addressIndex.updateGroups(newImmutableAddressGroups); if (addressIndex.seekTo(previousAddress)) { return Status.OK; + } else { + addressIndex.reset(); // Previous ready subchannel not in the new list of addresses } - addressIndex.reset(); } else { addressIndex.updateGroups(newImmutableAddressGroups); } - // Create subchannels for all new addresses, preserving existing connections + // remove old subchannels that were not in new address list Set oldAddrs = new HashSet<>(subchannels.keySet()); + + // Flatten the new EAGs addresses Set newAddrs = new HashSet<>(); for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) { - for (SocketAddress addr : endpoint.getAddresses()) { - newAddrs.add(addr); - if (!subchannels.containsKey(addr)) { - createNewSubchannel(addr); - } - } + newAddrs.addAll(endpoint.getAddresses()); } - // remove old subchannels that were not in new address list + // Shut them down and remove them for (SocketAddress oldAddr : oldAddrs) { if (!newAddrs.contains(oldAddr)) { - subchannels.get(oldAddr).getSubchannel().shutdown(); - subchannels.remove(oldAddr); + subchannels.remove(oldAddr).getSubchannel().shutdown(); } } @@ -141,6 +162,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { // start connection attempt at first address rawConnectivityState = CONNECTING; updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); + cancelScheduleTask(); requestConnection(); } else if (rawConnectivityState == IDLE) { @@ -150,6 +172,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } else if (rawConnectivityState == TRANSIENT_FAILURE) { // start connection attempt at first address + cancelScheduleTask(); requestConnection(); } @@ -162,15 +185,13 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { subchannelData.getSubchannel().shutdown(); } subchannels.clear(); - // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine - // for time being. updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); } void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { ConnectivityState newState = stateInfo.getState(); // Shutdown channels/previously relevant subchannels can still callback with state updates. - // To prevent pickers from returning these obselete subchannels, this logic + // To prevent pickers from returning these obsolete subchannels, this logic // is included to check if the current list of active subchannels includes this subchannel. SubchannelData subchannelData = subchannels.get(getAddress(subchannel)); if (subchannelData == null || subchannelData.getSubchannel() != subchannel) { @@ -179,6 +200,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { if (newState == SHUTDOWN) { return; } + if (newState == IDLE) { helper.refreshNameResolution(); } @@ -211,32 +233,48 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { rawConnectivityState = IDLE; updateBalancingState(IDLE, new RequestConnectionPicker(this)); break; + case CONNECTING: rawConnectivityState = CONNECTING; updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult())); break; + case READY: shutdownRemaining(subchannelData); addressIndex.seekTo(getAddress(subchannel)); rawConnectivityState = READY; updateHealthCheckedState(subchannelData); break; + case TRANSIENT_FAILURE: // If we are looking at current channel, request a connection if possible if (addressIndex.isValid() && subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) { - addressIndex.increment(); - requestConnection(); + if (addressIndex.increment()) { + cancelScheduleTask(); + requestConnection(); // is recursive so might hit the end of the addresses + } + } - // If no addresses remaining, go into TRANSIENT_FAILURE - if (!addressIndex.isValid()) { + if (isPassComplete()) { + rawConnectivityState = TRANSIENT_FAILURE; + updateBalancingState(TRANSIENT_FAILURE, + new Picker(PickResult.withError(stateInfo.getStatus()))); + + // Refresh Name Resolution, but only when all 3 conditions are met + // * We are at the end of addressIndex + // * have had status reported for all subchannels. + // * And one of the following conditions: + // * Have had enough TF reported since we completed first pass + // * Just completed the first pass + if (++numTf >= addressIndex.size() || firstPass) { + firstPass = false; + numTf = 0; helper.refreshNameResolution(); - rawConnectivityState = TRANSIENT_FAILURE; - updateBalancingState(TRANSIENT_FAILURE, - new Picker(PickResult.withError(stateInfo.getStatus()))); } } break; + default: throw new IllegalArgumentException("Unsupported state:" + newState); } @@ -269,9 +307,16 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { @Override public void shutdown() { + log.log(Level.FINE, + "Shutting down, currently have {} subchannels created", subchannels.size()); + rawConnectivityState = SHUTDOWN; + concludedState = SHUTDOWN; + cancelScheduleTask(); + for (SubchannelData subchannelData : subchannels.values()) { subchannelData.getSubchannel().shutdown(); } + subchannels.clear(); } @@ -280,6 +325,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { * that all other subchannels must be shutdown. */ private void shutdownRemaining(SubchannelData activeSubchannelData) { + cancelScheduleTask(); for (SubchannelData subchannelData : subchannels.values()) { if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) { subchannelData.getSubchannel().shutdown(); @@ -291,29 +337,85 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } /** - * Requests a connection to the next applicable address' subchannel, creating one if necessary - * If the current channel has already attempted a connection, we attempt a connection - * to the next address/subchannel in our list. - */ + * Requests a connection to the next applicable address' subchannel, creating one if necessary. + * Schedules a connection to next address in list as well. + * If the current channel has already attempted a connection, we attempt a connection + * to the next address/subchannel in our list. We assume that createNewSubchannel will never + * return null. + */ @Override public void requestConnection() { - if (subchannels.size() == 0) { + if (addressIndex == null || !addressIndex.isValid() || rawConnectivityState == SHUTDOWN ) { return; } - if (addressIndex.isValid()) { - Subchannel subchannel = subchannels.containsKey(addressIndex.getCurrentAddress()) - ? subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() - : createNewSubchannel(addressIndex.getCurrentAddress()); - ConnectivityState subchannelState = - subchannels.get(addressIndex.getCurrentAddress()).getState(); - if (subchannelState == IDLE) { + Subchannel subchannel; + SocketAddress currentAddress; + currentAddress = addressIndex.getCurrentAddress(); + subchannel = subchannels.containsKey(currentAddress) + ? subchannels.get(currentAddress).getSubchannel() + : createNewSubchannel(currentAddress); + + ConnectivityState subchannelState = subchannels.get(currentAddress).getState(); + switch (subchannelState) { + case IDLE: subchannel.requestConnection(); - } else if (subchannelState == CONNECTING || subchannelState == TRANSIENT_FAILURE) { + subchannels.get(currentAddress).updateState(CONNECTING); + scheduleNextConnection(); + break; + case CONNECTING: + if (enableHappyEyeballs) { + scheduleNextConnection(); + } else { + subchannel.requestConnection(); + } + break; + case TRANSIENT_FAILURE: addressIndex.increment(); requestConnection(); + break; + case READY: // Shouldn't ever happen + log.warning("Requesting a connection even though we have a READY subchannel"); + break; + case SHUTDOWN: + default: + // Makes checkstyle happy + } + } + + + /** + * Happy Eyeballs + * Schedules connection attempt to happen after a delay to the next available address. + */ + private void scheduleNextConnection() { + if (!enableHappyEyeballs + || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) { + return; + } + + class StartNextConnection implements Runnable { + @Override + public void run() { + scheduleConnectionTask = null; + if (addressIndex.increment()) { + requestConnection(); + } } } + + scheduleConnectionTask = helper.getSynchronizationContext().schedule( + new StartNextConnection(), + CONNECTION_DELAY_INTERVAL_MS, + TimeUnit.MILLISECONDS, + helper.getScheduledExecutorService()); + } + + private void cancelScheduleTask() { + if (scheduleConnectionTask != null) { + scheduleConnectionTask.cancel(); + scheduleConnectionTask = null; + } } private Subchannel createNewSubchannel(SocketAddress addr) { @@ -324,6 +426,10 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { new EquivalentAddressGroup(addr))) .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener) .build()); + if (subchannel == null) { + log.warning("Was not able to create subchannel for " + addr); + throw new IllegalStateException("Can't create subchannel"); + } SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener); hcListener.subchannelData = subchannelData; subchannels.put(addr, subchannelData); @@ -331,15 +437,23 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { if (attrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) { hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY); } - subchannel.start(new SubchannelStateListener() { - @Override - public void onSubchannelState(ConnectivityStateInfo stateInfo) { - processSubchannelState(subchannel, stateInfo); - } - }); + subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo)); return subchannel; } + private boolean isPassComplete() { + if (addressIndex == null || addressIndex.isValid() + || subchannels.size() < addressIndex.size()) { + return false; + } + for (SubchannelData sc : subchannels.values()) { + if (!sc.isCompletedConnectivityAttempt() ) { + return false; + } + } + return true; + } + private final class HealthListener implements SubchannelStateListener { private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE); private SubchannelData subchannelData; @@ -402,12 +516,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { @Override public PickResult pickSubchannel(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { - helper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - pickFirstLeafLoadBalancer.requestConnection(); - } - }); + helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection); } return PickResult.withNoResult(); } @@ -415,6 +524,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { /** * Index as in 'i', the pointer to an entry. Not a "search index." + * All updates should be done in a synchronization context. */ @VisibleForTesting static final class Index { @@ -423,11 +533,11 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { private int addressIndex; public Index(List groups) { - this.addressGroups = groups; + this.addressGroups = groups != null ? groups : Collections.emptyList(); } public boolean isValid() { - // addressIndex will never be invalid + // Is invalid if empty or has incremented off the end return groupIndex < addressGroups.size(); } @@ -435,13 +545,24 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return groupIndex == 0 && addressIndex == 0; } - public void increment() { + /** + * Move to next address in group. If last address in group move to first address of next group. + * @return false if went off end of the list, otherwise true + */ + public boolean increment() { + if (!isValid()) { + return false; + } + EquivalentAddressGroup group = addressGroups.get(groupIndex); addressIndex++; if (addressIndex >= group.getAddresses().size()) { groupIndex++; addressIndex = 0; + return groupIndex < addressGroups.size(); } + + return true; } public void reset() { @@ -450,22 +571,24 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } public SocketAddress getCurrentAddress() { + if (!isValid()) { + throw new IllegalStateException("Index is past the end of the address group list"); + } return addressGroups.get(groupIndex).getAddresses().get(addressIndex); } public Attributes getCurrentEagAttributes() { + if (!isValid()) { + throw new IllegalStateException("Index is off the end of the address group list"); + } return addressGroups.get(groupIndex).getAttributes(); } - public List getGroups() { - return addressGroups; - } - /** * Update to new groups, resetting the current index. */ - public void updateGroups(List newGroups) { - addressGroups = newGroups; + public void updateGroups(ImmutableList newGroups) { + addressGroups = newGroups != null ? newGroups : Collections.emptyList(); reset(); } @@ -485,12 +608,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } return false; } + + public int size() { + return (addressGroups != null) ? addressGroups.size() : 0; + } } private static final class SubchannelData { private final Subchannel subchannel; private ConnectivityState state; private final HealthListener healthListener; + private boolean completedConnectivityAttempt = false; public SubchannelData(Subchannel subchannel, ConnectivityState state, HealthListener subchannelHealthListener) { @@ -507,8 +635,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return this.state; } + public boolean isCompletedConnectivityAttempt() { + return completedConnectivityAttempt; + } + private void updateState(ConnectivityState newState) { this.state = newState; + if (newState == READY || newState == TRANSIENT_FAILURE) { + completedConnectivityAttempt = true; + } else if (newState == IDLE) { + completedConnectivityAttempt = false; + } } private ConnectivityState getHealthState() { diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index b944a48640..92222ac9af 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -21,16 +21,18 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY; import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY; +import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -41,11 +43,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.errorprone.annotations.Keep; import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; @@ -54,20 +59,25 @@ import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.InOrder; @@ -77,11 +87,24 @@ import org.mockito.junit.MockitoRule; /** Unit test for {@link PickFirstLeafLoadBalancer}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class PickFirstLeafLoadBalancerTest { + public static final Status CONNECTION_ERROR = + Status.UNAVAILABLE.withDescription("Simulated connection error"); + + @Parameterized.Parameters(name = "{0}") + public static List enableHappyEyeballs() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter + public boolean enableHappyEyeballs; private PickFirstLeafLoadBalancer loadBalancer; private final List servers = Lists.newArrayList(); private static final Attributes.Key FOO = Attributes.Key.create("foo"); + // For scheduled executor + private final FakeClock fakeClock = new FakeClock(); + // For syncContext private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @Override @@ -100,8 +123,7 @@ public class PickFirstLeafLoadBalancerTest { private ArgumentCaptor createArgsCaptor; @Captor private ArgumentCaptor stateListenerCaptor; - @Mock - private Helper mockHelper; + private final Helper mockHelper = mock(Helper.class, delegatesTo(new MockHelperImpl())); @Mock private FakeSubchannel mockSubchannel1; @Mock @@ -110,60 +132,81 @@ public class PickFirstLeafLoadBalancerTest { private FakeSubchannel mockSubchannel3; @Mock private FakeSubchannel mockSubchannel4; + @Mock + private FakeSubchannel mockSubchannel5; @Mock // This LoadBalancer doesn't use any of the arg fields, as verified in tearDown(). private PickSubchannelArgs mockArgs; + private String originalHappyEyeballsEnabledValue; + @Before public void setUp() { - for (int i = 1; i < 5; i++) { + originalHappyEyeballsEnabledValue = + System.getProperty(PickFirstLeafLoadBalancer.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS); + System.setProperty(PickFirstLeafLoadBalancer.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, + enableHappyEyeballs ? "true" : "false"); + + for (int i = 1; i <= 5; i++) { SocketAddress addr = new FakeSocketAddress("server" + i); servers.add(new EquivalentAddressGroup(addr)); } - mockSubchannel1 = new FakeSubchannel(Lists.newArrayList( - new EquivalentAddressGroup(new FakeSocketAddress("fake"))), null); mockSubchannel1 = mock(FakeSubchannel.class); mockSubchannel2 = mock(FakeSubchannel.class); mockSubchannel3 = mock(FakeSubchannel.class); mockSubchannel4 = mock(FakeSubchannel.class); + mockSubchannel5 = mock(FakeSubchannel.class); when(mockSubchannel1.getAttributes()).thenReturn(Attributes.EMPTY); when(mockSubchannel2.getAttributes()).thenReturn(Attributes.EMPTY); when(mockSubchannel3.getAttributes()).thenReturn(Attributes.EMPTY); when(mockSubchannel4.getAttributes()).thenReturn(Attributes.EMPTY); - when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) - .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + when(mockSubchannel5.getAttributes()).thenReturn(Attributes.EMPTY); when(mockSubchannel1.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); when(mockSubchannel2.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(1))); when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(2))); when(mockSubchannel4.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(3))); + when(mockSubchannel5.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(4))); - when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); loadBalancer = new PickFirstLeafLoadBalancer(mockHelper); } @After - public void tearDown() throws Exception { + public void tearDown() { + if (originalHappyEyeballsEnabledValue == null) { + System.clearProperty(PickFirstLeafLoadBalancer.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS); + } else { + System.setProperty(PickFirstLeafLoadBalancer.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, + originalHappyEyeballsEnabledValue); + } + + loadBalancer.shutdown(); verifyNoMoreInteractions(mockArgs); } @Test - public void pickAfterResolved() throws Exception { + public void pickAfterResolved() { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); + forwardTimeByConnectionDelay(3); + int expectedCreates = enableHappyEyeballs ? 4 : 1; + verify(mockHelper, times(expectedCreates)).createSubchannel(createArgsCaptor.capture()); List argsList = createArgsCaptor.getAllValues(); assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); - assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); - assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); - assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + if (enableHappyEyeballs) { + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); + } verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel1).requestConnection(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); - // Calling pickSubchannel() twice gave the same result + // Calling pickSubchannel() twice gave the same result and doesn't interact with mockHelper assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); @@ -171,61 +214,70 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void pickAfterResolved_shuffle() throws Exception { + public void pickAfterResolved_shuffle() { + servers.remove(4); servers.remove(3); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity) .setLoadBalancingPolicyConfig(new PickFirstLeafLoadBalancerConfig(true, 123L)).build()); - verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture()); + verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + + + forwardTimeByConnectionDelay(servers.size() - 1); + int expectedScs = enableHappyEyeballs ? servers.size() : 1; + verify(mockHelper, times(expectedScs)).createSubchannel(createArgsCaptor.capture()); + List argsList = createArgsCaptor.getAllValues(); // We should still see the same set of addresses. // Because we use a fixed seed, the addresses should always be shuffled in this order. assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(1)); - assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(0)); - assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + if (enableHappyEyeballs) { + assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(0)); + assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); + assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); + assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); + verify(mockSubchannel1).requestConnection(); + } verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verify(mockSubchannel1).requestConnection(); - - // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + // Calling pickSubchannel() twice gives the same result and doesn't interact with mockHelper + PickResult pick1 = pickerCaptor.getValue().pickSubchannel(mockArgs); + PickResult pick2 = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertEquals(pick1, pick2); verifyNoMoreInteractions(mockHelper); + assertThat(pick1.toString()).contains("subchannel=null"); + + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + PickResult pick3 = pickerCaptor.getValue().pickSubchannel(mockArgs); + PickResult pick4 = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertEquals(pick3, pick4); + assertThat(pick3.toString()).contains("subchannel=Mock"); } @Test - public void pickAfterResolved_noShuffle() throws Exception { + public void pickAfterResolved_noShuffle() { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity) .setLoadBalancingPolicyConfig(new PickFirstLeafLoadBalancerConfig(false)).build()); - verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); - List argsList = createArgsCaptor.getAllValues(); - assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); - assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); - assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); - assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); - assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel1).requestConnection(); // Calling pickSubchannel() twice gave the same result assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); - - verifyNoMoreInteractions(mockHelper); + assertNotNull(pickerCaptor.getValue().pickSubchannel(mockArgs)); } @Test - public void requestConnectionPicker() throws Exception { + public void requestConnectionPicker() { // Set up assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); List newServers = Lists.newArrayList(servers.get(0), servers.get(1), @@ -237,25 +289,20 @@ public class PickFirstLeafLoadBalancerTest { InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); - // We initialize and start all subchannels - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - - // We start connection attempt to the first address in the list + // We initialize and start first subchannel inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockHelper).createSubchannel(any()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + + // We start connection attempt to the first address in the list inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // If we send the first subchannel into idle ... stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - SubchannelPicker picker = pickerCaptor.getValue(); // Calling pickSubchannel() requests a connection, gives the same result when called twice. @@ -275,18 +322,18 @@ public class PickFirstLeafLoadBalancerTest { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); InOrder inOrder = inOrder(mockHelper, mockSubchannel1); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); inOrder.verify(mockSubchannel1).requestConnection(); - Status error = Status.UNAUTHENTICATED.withDescription("permission denied"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).refreshNameResolution(); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + inOrder.verify(mockHelper).refreshNameResolution(); + assertEquals(CONNECTION_ERROR, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); @@ -298,60 +345,49 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void pickAfterResolvedAndUnchanged() throws Exception { + public void pickAfterResolvedAndUnchanged() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - verify(mockSubchannel1).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - verify(mockSubchannel1).requestConnection(); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); - verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + inOrder.verify(mockSubchannel1, times(1)).requestConnection(); + // Second acceptResolvedAddresses shouldn't do anything loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - verify(mockSubchannel1).requestConnection(); - - verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); - verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); - assertThat(createArgsCaptor.getValue()).isNotNull(); - verify(mockHelper) - .updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class)); - - verifyNoMoreInteractions(mockHelper); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel1, never()).requestConnection(); + } else { + inOrder.verify(mockSubchannel1, times(1)).requestConnection(); + } + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); } @Test public void pickAfterResolvedAndChanged() { - SocketAddress socketAddr1 = new FakeSocketAddress("oldserver"); + SocketAddress socketAddr1 = new FakeSocketAddress("server1"); List oldServers = Lists.newArrayList(new EquivalentAddressGroup(socketAddr1)); - SocketAddress socketAddr2 = new FakeSocketAddress("newserver"); + SocketAddress socketAddr2 = new FakeSocketAddress("server2"); List newServers = Lists.newArrayList(new EquivalentAddressGroup(socketAddr2)); - InOrder inOrder = inOrder(mockHelper, mockSubchannel1); - - // accept resolved addresses + // accept resolved addresses which starts connection attempt to first address loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - verify(mockSubchannel1).start(any(SubchannelStateListener.class)); - verify(mockSubchannel1).getAttributes(); - - // start connection attempt to first address - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verify(mockSubchannel1).requestConnection(); - + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); // updating the subchannel addresses is unnecessary, but doesn't hurt anything loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel1).shutdown(); - - verifyNoMoreInteractions(mockSubchannel1); - verify(mockSubchannel2).start(any(SubchannelStateListener.class)); + verify(mockSubchannel2).requestConnection(); } @Test @@ -365,30 +401,26 @@ public class PickFirstLeafLoadBalancerTest { .setAttributes(Attributes.EMPTY).build()); InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); SubchannelStateListener healthListener = createArgsCaptor.getValue() .getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(mockSubchannel2).start(any(SubchannelStateListener.class)); - - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); inOrder.verify(mockSubchannel1).requestConnection(); // subchannel | state | health // subchannel1 | CONNECTING| CONNECTING - // sunchannel2 | IDLE | IDLE + // subchannel2 | IDLE | IDLE stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any()); // subchannel | state | health // subchannel1 | READY | CONNECTING - // sunchannel2 | IDLE | IDLE + // subchannel2 | IDLE | IDLE stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any()); - inOrder.verify(mockSubchannel2).shutdown(); // subchannel | state | health // subchannel1 | READY | READY @@ -407,8 +439,8 @@ public class PickFirstLeafLoadBalancerTest { healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any()); - when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(mockSubchannel2); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.INTERNAL)); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); SubchannelStateListener healthListener2 = createArgsCaptor.getValue() .getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY); @@ -417,19 +449,22 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockSubchannel2).requestConnection(); //ignore health update on non-ready subchannel healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + + verify(mockHelper, atLeast(0)).getSynchronizationContext(); + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); verifyNoMoreInteractions(mockHelper); // subchannel | state | health // subchannel1 | TF | READY - // sunchannel2 | TF | IDLE + // subchannel2 | TF | IDLE stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); - inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs) .getStatus()).isEqualTo(Status.UNAVAILABLE); // subchannel | state | health // subchannel1 | READY | READY - // sunchannel2 | TF | IDLE + // subchannel2 | TF | IDLE stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs) @@ -440,78 +475,86 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void pickAfterStateChangeAfterResolution() throws Exception { - InOrder inOrder = inOrder(mockHelper); + public void pickAfterStateChangeAfterResolution() { + InOrder inOrder = + inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + + SubchannelStateListener[] stateListeners = new SubchannelStateListener[4]; + Subchannel[] subchannels = new Subchannel[] { + mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4}; + + servers.remove(4); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - inOrder.verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); - List argsList = createArgsCaptor.getAllValues(); - assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); - assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); - assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); - assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); - assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); - verify(mockSubchannel1).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - verify(mockSubchannel3).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - verify(mockSubchannel4).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - reset(mockHelper); - when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + stateListeners[0] = stateListenerCaptor.getValue(); - stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListeners[1] = stateListenerCaptor.getValue(); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + stateListeners[2] = stateListenerCaptor.getValue(); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + stateListeners[3] = stateListenerCaptor.getValue(); + } + + reset(mockHelper); + + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); // subchannel reports connecting when pick subchannel is called assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); Status error = Status.UNAVAILABLE.withDescription("boom!"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - stateListener4.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).refreshNameResolution(); + + if (enableHappyEyeballs) { + for (SubchannelStateListener listener : stateListeners) { + listener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + } + } else { + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + for (int i = 1; i < stateListeners.length; i++) { + inOrder.verify(subchannels[i]).start(stateListenerCaptor.capture()); + stateListeners[i] = stateListenerCaptor.getValue(); + stateListeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + } + } + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(mockSubchannel1, picker.pickSubchannel(mockArgs).getSubchannel()); - - verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care - verifyNoMoreInteractions(mockHelper); } @Test - public void pickAfterResolutionAfterTransientValue() throws Exception { + public void pickAfterResolutionAfterTransientValue() { InOrder inOrder = inOrder(mockHelper); List newServers = Lists.newArrayList(servers.get(0)); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).createSubchannel(createArgsCaptor.capture()); verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verify(mockSubchannel1).requestConnection(); + reset(mockHelper); - when(mockHelper.getSynchronizationContext()).thenReturn(syncContext); // An error has happened. Status error = Status.UNAVAILABLE.withDescription("boom!"); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); inOrder.verify(mockHelper).refreshNameResolution(); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); @@ -522,7 +565,7 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void nameResolutionError() throws Exception { + public void nameResolutionError() { Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); loadBalancer.handleNameResolutionError(error); verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); @@ -534,73 +577,99 @@ public class PickFirstLeafLoadBalancerTest { } @Test - public void nameResolutionError_emptyAddressList() throws Exception { - servers.clear(); + public void nameResolutionError_emptyAddressList() { loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()).setAttributes(affinity).build()); verify(mockHelper).updateBalancingState(connectivityStateCaptor.capture(), pickerCaptor.capture()); PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); assertThat(pickResult.getSubchannel()).isNull(); assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); - assertThat(pickResult.getStatus().getDescription()).contains("returned no usable address"); + assertThat(pickResult.getStatus().getDescription()).contains("no usable address"); verify(mockSubchannel1, never()).requestConnection(); verifyNoMoreInteractions(mockHelper); } @Test - public void nameResolutionSuccessAfterError() throws Exception { + public void nameResolutionAfterSufficientTFs() { InOrder inOrder = inOrder(mockHelper); + acceptXSubchannels(3); + Status error = Status.UNAVAILABLE.withDescription("boom!"); + // Initial subchannel gets TF, LB is still in CONNECTING + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener1 = stateListenerCaptor.getValue(); + stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // Second subchannel gets TF, no UpdateBalancingState called + verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); + + // Third subchannel gets TF, LB goes into TRANSIENT_FAILURE and does a refreshNameResolution + verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); + assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); + + // Only after we have TFs reported for # of subchannels do we call refreshNameResolution + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + + // Now that we have refreshed, the count should have been reset + // Only after we have TFs reported for # of subchannels do we call refreshNameResolution + stateListener1.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + } + + @Test + public void nameResolutionSuccessAfterError() { loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); - inOrder.verify(mockHelper) + verify(mockHelper) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); verify(mockSubchannel1, never()).requestConnection(); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - inOrder.verify(mockHelper, times(4)).createSubchannel(createArgsCaptor.capture()); - List argsList = createArgsCaptor.getAllValues(); - assertThat(argsList.get(0).getAddresses().get(0)).isEqualTo(servers.get(0)); - assertThat(argsList.get(1).getAddresses().get(0)).isEqualTo(servers.get(1)); - assertThat(argsList.get(2).getAddresses().get(0)).isEqualTo(servers.get(2)); - assertThat(argsList.get(3).getAddresses().get(0)).isEqualTo(servers.get(3)); - assertThat(argsList.get(0).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(1).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(2).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(3).getAddresses().size()).isEqualTo(1); - assertThat(argsList.get(0).getAttributes()).isEqualTo(Attributes.EMPTY); - assertThat(argsList.get(1).getAttributes()).isEqualTo(Attributes.EMPTY); - assertThat(argsList.get(2).getAttributes()).isEqualTo(Attributes.EMPTY); - assertThat(argsList.get(3).getAttributes()).isEqualTo(Attributes.EMPTY); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - verify(mockSubchannel1).requestConnection(); + assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); - assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs) - .getSubchannel()); - - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); - - verifyNoMoreInteractions(mockHelper); + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); } @Test - public void nameResolutionErrorWithStateChanges() throws Exception { + public void nameResolutionErrorWithStateChanges() { List newServers = Lists.newArrayList(servers.get(0)); InOrder inOrder = inOrder(mockHelper); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); - verify(mockHelper).createSubchannel(createArgsCaptor.capture()); verify(mockSubchannel1).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); - inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState( eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + inOrder.verify(mockHelper).refreshNameResolution(); Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); loadBalancer.handleNameResolutionError(error); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); @@ -615,8 +684,6 @@ public class PickFirstLeafLoadBalancerTest { pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); assertNull(pickResult.getSubchannel()); assertEquals(error2, pickResult.getStatus()); - - verifyNoMoreInteractions(mockHelper); } @Test @@ -628,24 +695,34 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).requestConnection(); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); - // calling requestConnection() starts next subchannel + // calling requestConnection() only starts next subchannel when it is in TF loadBalancer.requestConnection(); - inOrder.verify(mockSubchannel2).requestConnection(); + inOrder.verify(mockSubchannel2, never()).start(any()); + + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); + loadBalancer.requestConnection(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + int expectedRequests = enableHappyEyeballs ? 1 : 2; + inOrder.verify(mockSubchannel2, times(expectedRequests)).requestConnection(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); // calling requestConnection is now a no-op loadBalancer.requestConnection(); - verifyNoMoreInteractions(mockHelper); + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); + inOrder.verify(mockSubchannel1, never()).requestConnection(); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel2, never()).requestConnection(); + } else { + inOrder.verify(mockSubchannel2).requestConnection(); + } } @Test @@ -653,8 +730,8 @@ public class PickFirstLeafLoadBalancerTest { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); assertFalse(loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder().setAddresses(Arrays.asList()) - .setAttributes(affinity).build()).isOk()); + ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()).setAttributes(affinity).build()).isOk()); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); } @@ -662,7 +739,7 @@ public class PickFirstLeafLoadBalancerTest { public void updateAddresses_eagListWithNull_returns_false() { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - List eags = Arrays.asList((EquivalentAddressGroup) null); + List eags = Collections.singletonList(null); assertFalse(loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(eags).setAttributes(affinity).build()).isOk()); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); @@ -674,22 +751,22 @@ public class PickFirstLeafLoadBalancerTest { mockSubchannel3, mockSubchannel4); // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + SubchannelStateListener stateListener2 = null; // Accept Addresses and verify proper connection flow assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + forwardTimeByConnectionDelay(); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is successful - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); @@ -697,9 +774,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Going into IDLE state stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); @@ -714,24 +788,25 @@ public class PickFirstLeafLoadBalancerTest { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); - // We create new channels, remove old ones, and keep intersecting ones - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + // New channels not created until there is a request, remove old ones, keep intersecting ones + inOrder.verify(mockHelper, never()).createSubchannel(any()); assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); verify(mockSubchannel1).shutdown(); - verify(mockSubchannel2).shutdown(); - assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + if (enableHappyEyeballs) { + verify(mockSubchannel2).shutdown(); + } - // If obselete subchannel becomes ready, the state should not be affected + // If obsolete subchannel becomes ready, the state should not be affected stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + if (enableHappyEyeballs) { + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + } assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); - // Calling pickSubchannel() twice gave the same result - assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + // Calling pickSubchannel() creates the subchannel and starts a connection + picker.pickSubchannel(mockArgs); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); // But the picker calls requestConnection() only once inOrder.verify(mockSubchannel3).requestConnection(); @@ -744,9 +819,6 @@ public class PickFirstLeafLoadBalancerTest { // Picking a subchannel returns subchannel 3 picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -754,6 +826,9 @@ public class PickFirstLeafLoadBalancerTest { InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + SubchannelStateListener stateListener2 = null; + SubchannelStateListener stateListener4 = null; + // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); @@ -761,14 +836,15 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).requestConnection(); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Creating second set of endpoints/addresses List newServers = Lists.newArrayList(servers.get(2), servers.get(3)); @@ -776,32 +852,40 @@ public class PickFirstLeafLoadBalancerTest { // Accept new resolved addresses to update loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + forwardTimeByConnectionDelay(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel3).requestConnection(); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + stateListener4 = stateListenerCaptor.getValue(); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Old subchannels should shut down (in no particular order) and request a connection verify(mockSubchannel1).shutdown(); - verify(mockSubchannel2).shutdown(); - inOrder.verify(mockSubchannel3).requestConnection(); + if (enableHappyEyeballs) { + verify(mockSubchannel2).shutdown(); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // If old subchannel becomes ready, the state should not be affected stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + if (enableHappyEyeballs) { + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Fail connection attempt to third address - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Verify starting connection attempt to fourth address + if (!enableHappyEyeballs) { + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + stateListener4 = stateListenerCaptor.getValue(); + } inOrder.verify(mockSubchannel4).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); @@ -813,20 +897,16 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); } @Test public void updateAddresses_disjoint_ready_twice() { - when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) - .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, - mockSubchannel4, mockSubchannel1, mockSubchannel2); InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + SubchannelStateListener stateListener2 = null; + SubchannelStateListener stateListener4 = null; // Accept Addresses and verify proper connection flow assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); @@ -835,25 +915,33 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is successful inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Trigger second subchannel to connect + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + } + + // Mark first subchannel ready, verify state change stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(mockSubchannel2).shutdown(); + + if (enableHappyEyeballs) { + // Successful connection shuts down other subchannel + inOrder.verify(mockSubchannel2).shutdown(); + } inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); + // Verify that picker returns correct subchannel assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Creating second set of endpoints/addresses List newServers = Lists.newArrayList(servers.get(2), servers.get(3)); @@ -861,44 +949,49 @@ public class PickFirstLeafLoadBalancerTest { // Accept new resolved addresses to update loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel3).requestConnection(); + + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + stateListener4 = stateListenerCaptor.getValue(); + } + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); picker = pickerCaptor.getValue(); - // If obselete subchannel becomes ready, the state should not be affected + // If obsolete subchannel becomes ready, the state should not be affected stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + if (enableHappyEyeballs) { + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Calling pickSubchannel() twice gave the same result assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); - // But the picker calls requestConnection() only once - inOrder.verify(mockSubchannel3).requestConnection(); + // The picker should only call requestConnection() once picker = pickerCaptor.getValue(); assertEquals(PickResult.withNoResult(), picker.pickSubchannel(mockArgs)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Ready subchannel 3 stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - // Successful connection shuts down other subchannel - inOrder.verify(mockSubchannel4).shutdown(); + + if (enableHappyEyeballs) { + // Successful connection shuts down other subchannel + inOrder.verify(mockSubchannel4).shutdown(); + } + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); // Verify that pickSubchannel() returns correct subchannel assertEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Creating third set of endpoints/addresses List newestServers = Lists.newArrayList(servers.get(0), servers.get(1)); @@ -906,15 +999,11 @@ public class PickFirstLeafLoadBalancerTest { // Second address update loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newestServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); - stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel3).shutdown(); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + stateListener = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); picker = pickerCaptor.getValue(); // Calling pickSubchannel() twice gave the same result @@ -925,75 +1014,73 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - // If obselete subchannel becomes ready, the state should not be affected + // If obsolete subchannel becomes ready, the state should not be affected stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - stateListener4.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + if (enableHappyEyeballs) { + stateListener4.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Connection attempt to address 1 is unsuccessful - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); // Starting connection attempt to address 2 - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + if (!enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } inOrder.verify(mockSubchannel2).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Connection attempt to address 2 is successful stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel1).shutdown(); // Successful connection shuts down other subchannel - inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); // Verify that picker still returns correct subchannel assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test public void updateAddresses_disjoint_transient_failure() { - // Starting first connection attempt - when(mockHelper.createSubchannel(any(CreateSubchannelArgs.class))) - .thenReturn(mockSubchannel1, mockSubchannel2, mockSubchannel3, - mockSubchannel4, mockSubchannel1, mockSubchannel2); InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + SubchannelStateListener stateListener2 = null; + // Creating first set of endpoints/addresses List addrs = Lists.newArrayList(servers.get(0), servers.get(1)); assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel1).requestConnection(); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + if (!enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing second connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); // sticky transient failure assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); @@ -1006,25 +1093,22 @@ public class PickFirstLeafLoadBalancerTest { // subchannel 3 still attempts a connection even though we stay in transient failure assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); verify(mockSubchannel1).shutdown(); verify(mockSubchannel2).shutdown(); inOrder.verify(mockSubchannel3).requestConnection(); - // Obselete subchannels should not affect us + // obsolete subchannels should not affect us stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Third subchannel connection attempt is unsuccessful - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - + inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener4 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel4).requestConnection(); // Fourth subchannel connection attempt is successful @@ -1036,9 +1120,6 @@ public class PickFirstLeafLoadBalancerTest { // Picking a subchannel returns subchannel 3 SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); } @Test @@ -1054,31 +1135,31 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + } else { + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + } SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - inOrder.verify(mockSubchannel1).requestConnection(); // First connection attempt is successful - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); // Successful connection attempt shuts down other subchannels - inOrder.verify(mockSubchannel2).shutdown(); - inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - SubchannelPicker picker = pickerCaptor.getValue(); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).shutdown(); + } // Verify that picker returns correct subchannel + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Going into IDLE state, nothing should happen unless requested stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); @@ -1086,11 +1167,10 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).refreshNameResolution(); inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); picker = pickerCaptor.getValue(); - verifyNoMoreInteractions(mockHelper); // Creating second set of intersecting endpoints/addresses List newServers = - Lists.newArrayList(servers.get(0), servers.get(1), servers.get(3)); + Lists.newArrayList(servers.get(0), servers.get(2), servers.get(3)); // Accept new resolved addresses to update loadBalancer.acceptResolvedAddresses( @@ -1098,20 +1178,24 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); // We create new channels and remove old ones, keeping intersecting ones - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper, never()).createSubchannel(createArgsCaptor.capture()); + forwardTimeByConnectionDelay(2); + verify(mockSubchannel3, never()).start(stateListenerCaptor.capture()); + verify(mockSubchannel4, never()).start(stateListenerCaptor.capture()); - // If obselete subchannel becomes ready, the state should not be affected + // If obsolete subchannel becomes ready, the state should not be affected stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); // Calling pickSubchannel() twice gave the same result + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); + assertEquals(PickResult.withNoResult(), picker.pickSubchannel(mockArgs)); + verify(mockSubchannel3, never()).start(stateListenerCaptor.capture()); // But the picker calls requestConnection() only once inOrder.verify(mockSubchannel1).requestConnection(); + inOrder.verify(mockSubchannel1, never()).requestConnection(); // internal subchannel calls back and reports connecting stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); @@ -1123,12 +1207,10 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(READY, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - // Picking a subchannel returns subchannel 1 + // Picking a subchannel returns subchannel 1 through multiple calls picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); } @Test @@ -1141,53 +1223,46 @@ public class PickFirstLeafLoadBalancerTest { Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); // Accept Addresses and verify proper connection flow - assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - inOrder.verify(mockSubchannel1).requestConnection(); + forwardTimeByConnectionDelay(2); // callback from internal subchannel stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + inOrder.verify(mockSubchannel1).requestConnection(); // Creating second set of endpoints/addresses List newServers = - Lists.newArrayList(servers.get(0), servers.get(1), servers.get(3)); + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(3), servers.get(4)); // Accept new resolved addresses to update loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper, never()).updateBalancingState(eq(CONNECTING), any()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - // Don't unnecessarily create new subchannels and keep intersecting ones - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture()); - verifyNoMoreInteractions(mockHelper); + // keep intersecting ones kept and start over so nothing should try to connect + verify(mockSubchannel1, never()).shutdown(); + verify(mockSubchannel2, never()).shutdown(); + inOrder.verify(mockSubchannel4, never()).requestConnection(); + verify(mockSubchannel5, never()).start(stateListenerCaptor.capture()); - // If obselete subchannel becomes ready, the state should not be affected - stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(2); + inOrder.verify(mockSubchannel1, never()).requestConnection(); + inOrder.verify(mockSubchannel4).requestConnection(); + } // First connection attempt is successful stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); // verify that picker returns correct subchannel - inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1199,30 +1274,21 @@ public class PickFirstLeafLoadBalancerTest { Lists.newArrayList(servers.get(0), servers.get(1)); // Accept Addresses and verify proper connection flow - assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + forwardTimeByConnectionDelay(); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is successful - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel2).shutdown(); + // Verify that picker returns correct subchannel inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Creating second set of endpoints/addresses List newServers = @@ -1237,9 +1303,6 @@ public class PickFirstLeafLoadBalancerTest { // Verify that picker still returns correct subchannel picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1255,29 +1318,23 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing second connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); // sticky transient failure assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); @@ -1291,7 +1348,6 @@ public class PickFirstLeafLoadBalancerTest { // subchannel 3 still attempts a connection even though we stay in transient failure assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel3).getAttributes(); inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); inOrder.verify(mockSubchannel3).requestConnection(); @@ -1312,9 +1368,6 @@ public class PickFirstLeafLoadBalancerTest { // Picking a subchannel returns subchannel 3 SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1324,6 +1377,8 @@ public class PickFirstLeafLoadBalancerTest { InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4); + SubchannelStateListener stateListener3 = null; + // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); @@ -1331,15 +1386,16 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).requestConnection(); + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + } + // callback from internal subchannel stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); @@ -1352,24 +1408,24 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // We create new channels and remove old ones, keeping intersecting ones - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockSubchannel2).shutdown(); - - // If obselete subchannel becomes ready, the state should not be affected - stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).shutdown(); + forwardTimeByConnectionDelay(); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + stateListener3 = stateListenerCaptor.getValue(); + } // First connection attempt is unsuccessful - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + if (!enableHappyEyeballs) { + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + stateListener3 = stateListenerCaptor.getValue(); + } // Subchannel 3 attempt starts but fails inOrder.verify(mockSubchannel3).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); } @@ -1384,16 +1440,12 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is successful - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); @@ -1401,9 +1453,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Going into IDLE state stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); @@ -1415,9 +1464,9 @@ public class PickFirstLeafLoadBalancerTest { ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); // Verify that no new subchannels were created or started - verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture()); + verify(mockHelper, times(1)).createSubchannel(createArgsCaptor.capture()); verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); - verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + verify(mockSubchannel2, times(0)).start(stateListenerCaptor.capture()); // First connection attempt is successful assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); @@ -1428,15 +1477,12 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test public void updateAddresses_identical_connecting() { - InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, - mockSubchannel3, mockSubchannel4); + InOrder inOrder = inOrder(mockHelper); + // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); @@ -1444,24 +1490,40 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper).createSubchannel(any()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Accept same resolved addresses to update loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + // Verify that started over and found a noop on connecting for first subchannel + inOrder.verify(mockHelper, never()).createSubchannel(any()); + + if (enableHappyEyeballs) { + forwardTimeByConnectionDelay(); + inOrder.verify(mockHelper).createSubchannel(any()); + verify(mockSubchannel2).start(any()); + } + + // Accept same resolved addresses to update - all were connecting, no updateBalancingState + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); + // Verify that no new subchannels were created or started - verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture()); - verify(mockSubchannel1, times(1)).start(stateListenerCaptor.capture()); - verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); + inOrder.verify(mockHelper, never()).createSubchannel(any()); + verify(mockSubchannel1, times(1)).start(any()); + if (enableHappyEyeballs) { + verify(mockSubchannel2, times(1)).start(any()); + } else { + verify(mockSubchannel2, times(0)).start(any()); + } // First connection attempt is successful - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); @@ -1469,15 +1531,10 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test public void updateAddresses_identical_ready() { - InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, - mockSubchannel3, mockSubchannel4); // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); @@ -1485,40 +1542,27 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is successful - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel2).shutdown(); // verify that picker returns correct subchannel - inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // Accept same resolved addresses to update + reset(mockHelper); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS, TimeUnit.MILLISECONDS); // Verify that no new subchannels were created or started - verify(mockHelper, times(2)).createSubchannel( - any(CreateSubchannelArgs.class)); - verify(mockSubchannel1, times(1)).start( - any(SubchannelStateListener.class)); - verify(mockSubchannel2, times(1)).start( - any(SubchannelStateListener.class)); + verify(mockSubchannel2,never()).start(any()); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); // verify that picker hasn't changed via checking mock helper's interactions @@ -1536,28 +1580,22 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // First connection attempt is unsuccessful - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Second connection attempt is unsuccessful - inOrder.verify(mockSubchannel2).requestConnection(); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).refreshNameResolution(); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Accept same resolved addresses to update @@ -1570,7 +1608,9 @@ public class PickFirstLeafLoadBalancerTest { verify(mockSubchannel2, times(1)).start(stateListenerCaptor.capture()); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - // No new connections are requested, subchannels responsible for completing their own backoffs + // No new connections are requested, subchannels responsible for completing their own backoff + verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care + verify(mockHelper, atLeast(0)).getScheduledExecutorService(); verifyNoMoreInteractions(mockHelper); // First connection attempt is successful @@ -1582,9 +1622,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1594,28 +1631,19 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Second connection attempt is successful stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); @@ -1625,9 +1653,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1643,35 +1668,32 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing second connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).refreshNameResolution(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(mockHelper).refreshNameResolution(); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - // Mimic backoff for first address + // backoff for first address + forwardTimeByConnectionDelay(); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); @@ -1680,11 +1702,11 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Failing second connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Mimic backoff for first address @@ -1705,13 +1727,13 @@ public class PickFirstLeafLoadBalancerTest { SubchannelPicker picker = pickerCaptor.getValue(); when(mockSubchannel2.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // If first subchannel is ready before it completes shutdown, we still choose subchannel 2 - // This can be verified by checking the mock helper. + // This can be verified by checking the mock helper actions after setting it READY. + reset(mockHelper); stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + + // Happy Eyeballs, once transient failure is reported, no longer schedules connections. verifyNoMoreInteractions(mockHelper); } @@ -1731,22 +1753,16 @@ public class PickFirstLeafLoadBalancerTest { loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); @@ -1762,9 +1778,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1775,45 +1788,38 @@ public class PickFirstLeafLoadBalancerTest { // Creating first set of endpoints/addresses List addrs = Lists.newArrayList(servers.get(0), servers.get(1)); - // Accepting resolved addresses starts all subchannels + // Accepting resolved addresses starts first subchannel and cascades on failures assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Mimic backoff for first address stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing second connection attempt - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).refreshNameResolution(); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - // sticky transient failure + inOrder.verify(mockHelper).refreshNameResolution(); + // sticky transient failure assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Failing connection attempt to first address - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); // Mimic backoff for second address @@ -1823,18 +1829,17 @@ public class PickFirstLeafLoadBalancerTest { // Connection attempt to second address is now successful stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); // Verify that picker returns correct subchannel - inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); // If first address is successful, nothing happens. Verify by checking mock helper stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); - verifyNoMoreInteractions(mockHelper); + inOrder.verify(mockHelper,never()).updateBalancingState(any(), any()); + inOrder.verify(mockHelper,never()).createSubchannel(any()); } @Test @@ -1846,28 +1851,32 @@ public class PickFirstLeafLoadBalancerTest { mockSubchannel3, mockSubchannel4); // Creating first set of endpoints/addresses List oldServers = Lists.newArrayList(servers.get(0), servers.get(1)); + SubchannelStateListener stateListener2 = null; // Accept Addresses and verify proper connection flow assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(oldServers).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel1).requestConnection(); + forwardTimeByConnectionDelay(); + + if (enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } // First connection attempt is unsuccessful - inOrder.verify(mockSubchannel1).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Second connection attempt is connecting + if (!enableHappyEyeballs) { + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + stateListener2 = stateListenerCaptor.getValue(); + } inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); @@ -1877,22 +1886,21 @@ public class PickFirstLeafLoadBalancerTest { ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); // Verify that no new subchannels were created or started - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockSubchannel3).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Second address connection attempt is unsuccessful, but should not go into transient failure - stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Third address connection attempt is unsuccessful, now we enter transient failure - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); - // Obselete subchannels have no impact + // obsolete subchannels have no impact stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); @@ -1905,9 +1913,6 @@ public class PickFirstLeafLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } @Test @@ -1927,26 +1932,18 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Successful second connection attempt stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); @@ -1960,34 +1957,64 @@ public class PickFirstLeafLoadBalancerTest { SubchannelPicker picker = pickerCaptor.getValue(); - // Calling pickSubchannel() requests a connection, gives the same result when called twice. + // Calling pickSubchannel() requests a connection. assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockSubchannel3).requestConnection(); - when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + inOrder.verify(mockSubchannel1).requestConnection(); + when(mockSubchannel1.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); + + // gives the same result when called twice stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - // first subchannel connection attempt fails - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + // third subchannel connection attempt fails + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - // second subchannel connection attempt + // second subchannel connection attempt succeeds inOrder.verify(mockSubchannel2).requestConnection(); stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertEquals(READY, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel3).shutdown(); // verify that picker returns correct subchannel - inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); + } + + @Test + public void shutdown() { + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3, + mockSubchannel4, mockSubchannel5); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).build()); + + forwardTimeByConnectionDelay(servers.size() - 1); + int expectedSubchannelsCreated = enableHappyEyeballs ? servers.size() : 1; + inOrder.verify(mockHelper, times(expectedSubchannelsCreated)).createSubchannel(any()); + + loadBalancer.shutdown(); + + inOrder.verify(mockSubchannel1).shutdown(); + if (enableHappyEyeballs) { + verify(mockSubchannel2).shutdown(); + verify(mockSubchannel3).shutdown(); + verify(mockSubchannel4).shutdown(); + verify(mockSubchannel5).shutdown(); + } + assertEquals(SHUTDOWN, loadBalancer.getConcludedConnectivityState()); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).build()); + forwardTimeByConnectionDelay(); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + inOrder.verify(mockSubchannel1, never()).start(any()); + inOrder.verify(mockSubchannel1, never()).requestConnection(); + inOrder.verify(mockSubchannel2, never()).requestConnection(); } @Test @@ -2003,24 +2030,17 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener = stateListenerCaptor.getValue(); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); - SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); - inOrder.verify(mockSubchannel1).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Failing first connection attempt - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // Starting second connection attempt - assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); inOrder.verify(mockSubchannel2).requestConnection(); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); @@ -2038,17 +2058,16 @@ public class PickFirstLeafLoadBalancerTest { // Calling pickSubchannel() requests a connection, gives the same result when called twice. assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs)); - inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); - inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); - inOrder.verify(mockSubchannel3).requestConnection(); + inOrder.verify(mockSubchannel1).requestConnection(); when(mockSubchannel3.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0))); stateListener3.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // first subchannel connection attempt fails - stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR)); assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); // second subchannel connection attempt @@ -2057,15 +2076,264 @@ public class PickFirstLeafLoadBalancerTest { assertEquals(READY, loadBalancer.getConcludedConnectivityState()); // verify that picker returns correct subchannel - inOrder.verify(mockSubchannel3).shutdown(); + inOrder.verify(mockSubchannel1).shutdown(); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel1), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel3), picker.pickSubchannel(mockArgs)); - assertNotEquals(PickResult.withSubchannel(mockSubchannel4), picker.pickSubchannel(mockArgs)); } + @Test + public void happy_eyeballs_trigger_connection_delay() { + Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, + mockSubchannel2, mockSubchannel3, mockSubchannel4); + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + inOrder.verify(mockSubchannel1).requestConnection(); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + + // Until we hit the connection delay interval threshold, nothing should happen + verifyNoMoreInteractions(mockSubchannel2); + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS - 1, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(mockSubchannel2); + + // After 250 ms, second connection attempt starts + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + verify(mockHelper, times(2)).createSubchannel(createArgsCaptor.capture()); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + verifyNoMoreInteractions(mockSubchannel3); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Second connection attempt is successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getConcludedConnectivityState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + } + + @Test + public void happy_eyeballs_connection_results_happen_after_get_to_end() { + Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + Status error = Status.UNAUTHENTICATED.withDescription("simulated failure"); + + List addrs = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + forwardTimeByConnectionDelay(2); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener3 = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // first connection attempt fails + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Move off the end of the list, but connections requests haven't been completed + forwardTimeByConnectionDelay(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // second connection attempt fails + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + // second connection attempt fails again, but still haven't finished third subchannel + stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + + // last subchannel's connection attempt fails + stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockHelper).refreshNameResolution(); + + + // Refail the first one, after third time should refreshNameResolution + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).refreshNameResolution(); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).refreshNameResolution(); + } + + + @Test + public void happy_eyeballs_pick_pushes_index_over_end() { + Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + Status error = Status.UNAUTHENTICATED.withDescription("simulated failure"); + + List addrs = + Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2)); + Subchannel[] subchannels = new Subchannel[] {mockSubchannel1, mockSubchannel2, mockSubchannel3}; + SubchannelStateListener[] listeners = new SubchannelStateListener[subchannels.length]; + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + forwardTimeByConnectionDelay(2); + for (int i = 0; i < subchannels.length; i++) { + inOrder.verify(subchannels[i]).start(stateListenerCaptor.capture()); + listeners[i] = stateListenerCaptor.getValue(); + listeners[i].onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + } + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + SubchannelPicker requestingPicker = pickerCaptor.getValue(); + + // First pick moves index to addr 2 + PickResult pickResult = requestingPicker.pickSubchannel(mockArgs); + assertEquals("RequestConnectionPicker", requestingPicker.getClass().getSimpleName()); + assertEquals(PickResult.withNoResult(), pickResult); + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + + // Second pick moves index to addr 3 + pickResult = requestingPicker.pickSubchannel(mockArgs); + assertEquals(PickResult.withNoResult(), pickResult); + + // Sending TF state to one subchannel pushes index past end, but shouldn't do anything + listeners[2].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper, never()).updateBalancingState(eq(TRANSIENT_FAILURE), any()); + + // Put the LB into TF + listeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + listeners[1].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult pickResultTF = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertFalse(pickResultTF.getStatus().isOk()); + + // Doing a pick on the old RequestConnectionPicker when past the index end + pickResult = requestingPicker.pickSubchannel(mockArgs); + assertEquals(PickResult.withNoResult(), pickResult); + inOrder.verify(mockHelper, never()).updateBalancingState(any(), any()); + + // Try pushing after end with just picks + listeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + for (SubchannelStateListener listener : listeners) { + listener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE)); + } + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(affinity).build()); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + SubchannelPicker requestingPicker2 = pickerCaptor.getValue(); + for (int i = 0; i <= subchannels.length; i++) { + pickResult = requestingPicker2.pickSubchannel(mockArgs); + assertEquals(PickResult.withNoResult(), pickResult); + } + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + + for (SubchannelStateListener listener : listeners) { + listener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + } + assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState()); + + } + + @Test + public void happy_eyeballs_fail_then_trigger_connection_delay() { + Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs + // Starting first connection attempt + InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3); + assertEquals(IDLE, loadBalancer.getConcludedConnectivityState()); + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture()); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener = stateListenerCaptor.getValue(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // indicates scheduling a connection + inOrder.verify(mockSubchannel1).requestConnection(); + inOrder.verify(mockHelper).getSynchronizationContext(); + inOrder.verify(mockHelper).getScheduledExecutorService(); + + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Until we hit the connection delay interval threshold, no connections should be requested + verify(mockSubchannel1, times(1)).requestConnection(); + verify(mockSubchannel2, times(0)).requestConnection(); + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS - 1, TimeUnit.MILLISECONDS); + verify(mockSubchannel1, times(1)).requestConnection(); + verify(mockSubchannel2, times(0)).requestConnection(); + + // If a connection fails, the next scheduled connection is reset to happen 250 ms later + Status error = Status.UNAUTHENTICATED.withDescription("simulated failure"); + stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + verify(mockSubchannel2, times(1)).requestConnection(); + + // This time, after 1 ms, no connection attempt occurs + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture()); + SubchannelStateListener stateListener2 = stateListenerCaptor.getValue(); + verify(mockSubchannel1, times(1)).requestConnection(); + verify(mockSubchannel2, times(1)).requestConnection(); + + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + verify(mockSubchannel3, times(0)).requestConnection(); + + // After 250 ms, second connection attempt starts + // Skip subchannel 2 and request to address 3 + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS - 1, TimeUnit.MILLISECONDS); + verify(mockSubchannel1, times(1)).requestConnection(); + verify(mockSubchannel2, times(1)).requestConnection(); + verify(mockSubchannel3, times(1)).requestConnection(); + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + inOrder.verify(mockSubchannel2).requestConnection(); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Simulate first connection attempt coming out of backoff + stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); + assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState()); + + // Both subchannels racing, second connection attempt is successful + stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY)); + assertEquals(READY, loadBalancer.getConcludedConnectivityState()); + + // Verify that picker returns correct subchannel + inOrder.verify(mockSubchannel1).shutdown(); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + assertEquals(PickResult.withSubchannel(mockSubchannel2), picker.pickSubchannel(mockArgs)); + } + + @Test + public void advance_index_then_request_connection() { + loadBalancer.requestConnection(); // should be handled without throwing exception + + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build()); + forwardTimeByConnectionDelay(servers.size()); + + loadBalancer.requestConnection(); // should be handled without throwing exception + } @Test public void index_looping() { @@ -2144,7 +2412,7 @@ public class PickFirstLeafLoadBalancerTest { index.increment(); index.increment(); // We want to make sure both groupIndex and addressIndex are reset - index.updateGroups(Arrays.asList( + index.updateGroups(ImmutableList.of( new EquivalentAddressGroup(Arrays.asList(addr1)), new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1); @@ -2183,11 +2451,34 @@ public class PickFirstLeafLoadBalancerTest { } + private void forwardTimeByConnectionDelay() { + fakeClock.forwardTime(CONNECTION_DELAY_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + private void forwardTimeByConnectionDelay(int times) { + for (int i = 0; i < times; i++) { + forwardTimeByConnectionDelay(); + } + } + + private void acceptXSubchannels(int num) { + List newServers = new ArrayList<>(); + for (int i = 0; i < num; i++) { + newServers.add(servers.get(i)); + } + loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build()); + } + + /** + * This is currently only used for mocks, but could be used in a real test. + */ private static class FakeSubchannel extends Subchannel { private final Attributes attributes; private List eags; private SubchannelStateListener listener; + @Keep public FakeSubchannel(List eags, Attributes attributes) { this.eags = Collections.unmodifiableList(eags); this.attributes = attributes; @@ -2222,4 +2513,50 @@ public class PickFirstLeafLoadBalancerTest { listener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING)); } } + + private class MockHelperImpl extends LoadBalancer.Helper { + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return null; + } + + @Override + public String getAuthority() { + return null; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + // ignore + } + + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return fakeClock.getScheduledExecutorService(); + } + + @Override + public void refreshNameResolution() { + // noop + } + + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + SocketAddress addr = args.getAddresses().get(0).getAddresses().get(0); + List fakeSubchannels = + Arrays.asList(mockSubchannel1, mockSubchannel2, mockSubchannel3, mockSubchannel4, + mockSubchannel5); + for (int i = 1; i <= 5; i++) { + if (addr.toString().equals(new FakeSocketAddress("server" + i).toString())) { + return fakeSubchannels.get(i - 1); + } + } + throw new IllegalArgumentException("Unexpected address: " + addr); + } + } } \ No newline at end of file