Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time (#11520)

* Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time if environment variable GRPC_SERIALIZE_RETRIES == true.

Cache serializingRetries value so that it doesn't have to look up the flag every time.

Clear the correct task when READY in processSubchannelState and move the logic to cancelScheduledTasks

Cleanup based on PR review

remove unneeded checks for shutdown.

* Fix previously broken tests

* Shutdown previous subchannel when run off end of index.

* Provide option to disable subchannel retries to let PFLeafLB take control of retries.

* InternalSubchannel internally goes to IDLE when sees TF when reconnect is disabled.
Remove an extra index.increment in LeafLB
This commit is contained in:
Larry Safran 2024-10-02 17:03:47 -07:00 committed by GitHub
parent 6f3542297c
commit 9bb06af963
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 291 additions and 52 deletions

View File

@ -121,6 +121,12 @@ public abstract class LoadBalancer {
HEALTH_CONSUMER_LISTENER_ARG_KEY =
LoadBalancer.CreateSubchannelArgs.Key.create("internal:health-check-consumer-listener");
@Internal
public static final LoadBalancer.CreateSubchannelArgs.Key<Boolean>
DISABLE_SUBCHANNEL_RECONNECT_KEY =
LoadBalancer.CreateSubchannelArgs.Key.createWithDefault(
"internal:disable-subchannel-reconnect", Boolean.FALSE);
@Internal
public static final Attributes.Key<Boolean>
HAS_HEALTH_PRODUCER_LISTENER_KEY =

View File

@ -45,6 +45,7 @@ import io.grpc.InternalChannelz.ChannelStats;
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -77,6 +78,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger;
private final boolean reconnectDisabled;
private final List<ClientTransportFilter> transportFilters;
@ -159,13 +161,15 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private volatile Attributes connectedAddressAttributes;
InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
InternalLogId logId, ChannelLogger channelLogger,
List<ClientTransportFilter> transportFilters) {
ClientTransportFactory transportFactory,
ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
ChannelTracer channelTracer, InternalLogId logId,
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
@ -187,6 +191,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
this.logId = Preconditions.checkNotNull(logId, "logId");
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
}
ChannelLogger getChannelLogger() {
@ -289,6 +294,11 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
}
gotoState(ConnectivityStateInfo.forTransientFailure(status));
if (reconnectDisabled) {
return;
}
if (reconnectPolicy == null) {
reconnectPolicy = backoffPolicyProvider.get();
}
@ -337,7 +347,11 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
if (state.getState() != newState.getState()) {
Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState);
if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
state = ConnectivityStateInfo.forNonError(IDLE);
} else {
state = newState;
}
callback.onStateChange(InternalSubchannel.this, newState);
}
}

View File

@ -1483,7 +1483,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup,
CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
@ -1915,7 +1915,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
args.getAddresses(),
args,
authority(),
userAgent,
backoffPolicyProvider,

View File

@ -64,17 +64,26 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask;
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
@Nullable
private ScheduledHandle reconnectTask = null;
private final boolean serializingRetries = isSerializingRetries();
PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}
static boolean isSerializingRetries() {
return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
}
@Override
public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (rawConnectivityState == SHUTDOWN) {
@ -225,9 +234,10 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
return;
}
if (newState == IDLE) {
if (newState == IDLE && subchannelData.state == READY) {
helper.refreshNameResolution();
}
// If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
// transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
// transient failure". Only a subchannel state change to READY will get the LB out of
@ -277,6 +287,8 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
} else {
scheduleBackoff();
}
}
@ -304,6 +316,39 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
}
}
/**
* Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
*/
private void scheduleBackoff() {
if (!serializingRetries) {
return;
}
class EndOfCurrentBackoff implements Runnable {
@Override
public void run() {
reconnectTask = null;
addressIndex.reset();
requestConnection();
}
}
// Just allow the previous one to trigger when ready if we're already in backoff
if (reconnectTask != null) {
return;
}
if (reconnectPolicy == null) {
reconnectPolicy = bkoffPolProvider.get();
}
long delayNanos = reconnectPolicy.nextBackoffNanos();
reconnectTask = helper.getSynchronizationContext().schedule(
new EndOfCurrentBackoff(),
delayNanos,
TimeUnit.NANOSECONDS,
helper.getScheduledExecutorService());
}
private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
@ -337,6 +382,11 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
rawConnectivityState = SHUTDOWN;
concludedState = SHUTDOWN;
cancelScheduleTask();
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
}
reconnectPolicy = null;
for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
@ -350,6 +400,12 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
* that all other subchannels must be shutdown.
*/
private void shutdownRemaining(SubchannelData activeSubchannelData) {
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
}
reconnectPolicy = null;
cancelScheduleTask();
for (SubchannelData subchannelData : subchannels.values()) {
if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
@ -391,8 +447,17 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
scheduleNextConnection();
break;
case TRANSIENT_FAILURE:
if (!serializingRetries) {
addressIndex.increment();
requestConnection();
} else {
if (!addressIndex.isValid()) {
scheduleBackoff();
} else {
subchannelData.subchannel.requestConnection();
subchannelData.updateState(CONNECTING);
}
}
break;
default:
// Wait for current subchannel to change state
@ -441,6 +506,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
.setAddresses(Lists.newArrayList(
new EquivalentAddressGroup(addr, attrs)))
.addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
.build());
if (subchannel == null) {
log.warning("Was not able to create subchannel for " + addr);
@ -458,7 +524,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
}
private boolean isPassComplete() {
if (addressIndex.isValid() || subchannels.size() < addressIndex.size()) {
if (subchannels.size() < addressIndex.size()) {
return false;
}
for (SubchannelData sc : subchannels.values()) {
@ -646,6 +712,16 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
}
}
@VisibleForTesting
int getGroupIndex() {
return addressIndex.groupIndex;
}
@VisibleForTesting
boolean isIndexValid() {
return addressIndex.isValid();
}
private static final class SubchannelData {
private final Subchannel subchannel;
private ConnectivityState state;

View File

@ -46,6 +46,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalChannelz;
import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport;
@ -309,10 +310,57 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicy2, times(backoff2Consulted)).nextBackoffNanos();
}
@Test public void twoAddressesReconnectDisabled() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(true,
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
assertEquals(IDLE, internalSubchannel.getState());
assertNull(internalSubchannel.obtainActiveTransport());
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory).newClientTransport(eq(addr1), any(), any());
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Still in CONNECTING
assertNull(internalSubchannel.obtainActiveTransport());
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());
// Second attempt will start immediately. Still no back-off policy.
verify(mockBackoffPolicyProvider, times(0)).get();
verify(mockTransportFactory, times(1))
.newClientTransport(
eq(addr2),
eq(createClientTransportOptions()),
isA(TransportLogger.class));
assertNull(internalSubchannel.obtainActiveTransport());
// Fail this one too
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed, but we aren't controlling retries.
assertEquals(IDLE, internalSubchannel.getState());
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
// Backoff reset and first back-off interval begins
verify(mockBackoffPolicy1, never()).nextBackoffNanos();
verify(mockBackoffPolicyProvider, never()).get();
assertTrue("Nothing should have been scheduled", fakeClock.getPendingTasks().isEmpty());
// Should follow orders and create an active transport.
internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING");
assertEquals(CONNECTING, internalSubchannel.getState());
// Shouldn't have anything scheduled, so shouldn't do anything
assertTrue("Nothing should have been scheduled 2", fakeClock.getPendingTasks().isEmpty());
}
@Test public void twoAddressesReconnect() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(addr1, addr2);
createInternalSubchannel(false,
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)));
assertEquals(IDLE, internalSubchannel.getState());
// Invocation counters
int transportsAddr1 = 0;
@ -1377,11 +1425,24 @@ public class InternalSubchannelTest {
}
private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
createInternalSubchannel(false, addrs);
}
private void createInternalSubchannel(boolean reconnectDisabled,
EquivalentAddressGroup ... addrs) {
List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs);
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
LoadBalancer.CreateSubchannelArgs.Builder argBuilder =
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups);
if (reconnectDisabled) {
argBuilder.addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, reconnectDisabled);
}
LoadBalancer.CreateSubchannelArgs createSubchannelArgs = argBuilder.build();
internalSubchannel = new InternalSubchannel(
createSubchannelArgs,
AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback,
channelz, CallTracer.getDefaultFactory().create(),

View File

@ -27,10 +27,12 @@ import static io.grpc.LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY;
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
import static io.grpc.internal.PickFirstLeafLoadBalancer.isSerializingRetries;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assume.assumeTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@ -73,7 +75,6 @@ import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -92,14 +93,22 @@ import org.mockito.junit.MockitoRule;
public class PickFirstLeafLoadBalancerTest {
public static final Status CONNECTION_ERROR =
Status.UNAVAILABLE.withDescription("Simulated connection error");
public static final String GRPC_SERIALIZE_RETRIES = "GRPC_SERIALIZE_RETRIES";
@Parameterized.Parameters(name = "{0}")
public static List<Boolean> enableHappyEyeballs() {
return Arrays.asList(true, false);
@Parameterized.Parameters(name = "{0}-{1}")
public static List<Object[]> data() {
return Arrays.asList(new Object[][] {
{false, false},
{false, true},
{true, false}});
}
@Parameterized.Parameter
@Parameterized.Parameter(value = 0)
public boolean serializeRetries;
@Parameterized.Parameter(value = 1)
public boolean enableHappyEyeballs;
private PickFirstLeafLoadBalancer loadBalancer;
private final List<EquivalentAddressGroup> servers = Lists.newArrayList();
private static final Attributes.Key<String> FOO = Attributes.Key.create("foo");
@ -137,13 +146,22 @@ public class PickFirstLeafLoadBalancerTest {
private PickSubchannelArgs mockArgs;
private String originalHappyEyeballsEnabledValue;
private String originalSerializeRetriesValue;
private long backoffMillis;
@Before
public void setUp() {
assumeTrue(!serializeRetries || !enableHappyEyeballs); // they are not compatible
backoffMillis = TimeUnit.SECONDS.toMillis(1);
originalSerializeRetriesValue = System.getProperty(GRPC_SERIALIZE_RETRIES);
System.setProperty(GRPC_SERIALIZE_RETRIES, Boolean.toString(serializeRetries));
originalHappyEyeballsEnabledValue =
System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS,
enableHappyEyeballs ? "true" : "false");
Boolean.toString(enableHappyEyeballs));
for (int i = 1; i <= 5; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
@ -176,6 +194,11 @@ public class PickFirstLeafLoadBalancerTest {
@After
public void tearDown() {
if (originalSerializeRetriesValue == null) {
System.clearProperty(GRPC_SERIALIZE_RETRIES);
} else {
System.setProperty(GRPC_SERIALIZE_RETRIES, originalSerializeRetriesValue);
}
if (originalHappyEyeballsEnabledValue == null) {
System.clearProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS);
} else {
@ -498,6 +521,9 @@ public class PickFirstLeafLoadBalancerTest {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel()).isSameInstanceAs(mockSubchannel1);
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
healthListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
verifyNoMoreInteractions(mockHelper);
@ -520,20 +546,7 @@ public class PickFirstLeafLoadBalancerTest {
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
stateListeners[0] = stateListenerCaptor.getValue();
if (enableHappyEyeballs) {
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture());
stateListeners[1] = stateListenerCaptor.getValue();
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
stateListeners[2] = stateListenerCaptor.getValue();
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture());
stateListeners[3] = stateListenerCaptor.getValue();
}
reset(mockHelper);
stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(READY));
stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).refreshNameResolution();
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
@ -543,11 +556,23 @@ public class PickFirstLeafLoadBalancerTest {
stateListeners[0].onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
Status error = Status.UNAVAILABLE.withDescription("boom!");
reset(mockHelper);
if (enableHappyEyeballs) {
for (SubchannelStateListener listener : stateListeners) {
listener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
}
stateListeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture());
stateListeners[1] = stateListenerCaptor.getValue();
stateListeners[1].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
stateListeners[2] = stateListenerCaptor.getValue();
stateListeners[2].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
forwardTimeByConnectionDelay();
inOrder.verify(mockSubchannel4).start(stateListenerCaptor.capture());
stateListeners[3] = stateListenerCaptor.getValue();
stateListeners[3].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
forwardTimeByConnectionDelay();
} else {
stateListeners[0].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
for (int i = 1; i < stateListeners.length; i++) {
@ -589,6 +614,8 @@ public class PickFirstLeafLoadBalancerTest {
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}
@ -619,6 +646,8 @@ public class PickFirstLeafLoadBalancerTest {
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
@ -651,6 +680,8 @@ public class PickFirstLeafLoadBalancerTest {
// Transition from TRANSIENT_ERROR to CONNECTING should also be ignored.
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
verify(mockHelper, atLeast(0)).getSynchronizationContext();
verify(mockHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(mockHelper);
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
@ -1518,6 +1549,8 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void updateAddresses_intersecting_transient_failure() {
assumeTrue(!isSerializingRetries());
// Starting first connection attempt
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4); // captor: captures
@ -1782,6 +1815,8 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void updateAddresses_identical_transient_failure() {
assumeTrue(!isSerializingRetries());
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2,
mockSubchannel3, mockSubchannel4);
// Creating first set of endpoints/addresses
@ -2295,7 +2330,7 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void happy_eyeballs_trigger_connection_delay() {
Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
// Starting first connection attempt
InOrder inOrder = inOrder(mockHelper, mockSubchannel1,
mockSubchannel2, mockSubchannel3, mockSubchannel4);
@ -2340,7 +2375,7 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void happy_eyeballs_connection_results_happen_after_get_to_end() {
Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3);
Status error = Status.UNAUTHENTICATED.withDescription("simulated failure");
@ -2393,7 +2428,7 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void happy_eyeballs_pick_pushes_index_over_end() {
Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3,
mockSubchannel2n2, mockSubchannel3n2);
@ -2471,7 +2506,7 @@ public class PickFirstLeafLoadBalancerTest {
@Test
public void happy_eyeballs_fail_then_trigger_connection_delay() {
Assume.assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
assumeTrue(enableHappyEyeballs); // This test is only for happy eyeballs
// Starting first connection attempt
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3);
assertEquals(IDLE, loadBalancer.getConcludedConnectivityState());
@ -2550,6 +2585,44 @@ public class PickFirstLeafLoadBalancerTest {
loadBalancer.requestConnection(); // should be handled without throwing exception
}
@Test
public void serialized_retries_two_passes() {
assumeTrue(serializeRetries); // This test is only for serialized retries
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2, mockSubchannel3);
Status error = Status.UNAUTHENTICATED.withDescription("simulated failure");
List<EquivalentAddressGroup> addrs =
Lists.newArrayList(servers.get(0), servers.get(1), servers.get(2));
Subchannel[] subchannels = new Subchannel[]{mockSubchannel1, mockSubchannel2, mockSubchannel3};
SubchannelStateListener[] listeners = new SubchannelStateListener[subchannels.length];
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(addrs).build());
forwardTimeByConnectionDelay(2);
for (int i = 0; i < subchannels.length; i++) {
inOrder.verify(subchannels[i]).start(stateListenerCaptor.capture());
inOrder.verify(subchannels[i]).requestConnection();
listeners[i] = stateListenerCaptor.getValue();
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error));
}
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
assertFalse("Index should be at end", loadBalancer.isIndexValid());
forwardTimeByBackoffDelay(); // should trigger retry
for (int i = 0; i < subchannels.length; i++) {
inOrder.verify(subchannels[i]).requestConnection();
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade
}
inOrder.verify(subchannels[0], never()).requestConnection(); // should wait for backoff delay
forwardTimeByBackoffDelay(); // should trigger retry again
for (int i = 0; i < subchannels.length; i++) {
inOrder.verify(subchannels[i]).requestConnection();
assertEquals(i, loadBalancer.getGroupIndex());
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade
}
}
@Test
public void index_looping() {
Attributes.Key<String> key = Attributes.Key.create("some-key");
@ -2689,6 +2762,11 @@ public class PickFirstLeafLoadBalancerTest {
}
}
private void forwardTimeByBackoffDelay() {
backoffMillis = (long) (backoffMillis * 1.8); // backoff factor default is 1.6 with Jitter .2
fakeClock.forwardTime(backoffMillis, TimeUnit.MILLISECONDS);
}
private void acceptXSubchannels(int num) {
List<EquivalentAddressGroup> newServers = new ArrayList<>();
for (int i = 0; i < num; i++) {

View File

@ -150,7 +150,8 @@ public class RingHashLoadBalancerTest {
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isNull();
Subchannel subchannel = Iterables.getOnlyElement(subchannels.values());
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2;
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst()
&& !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2;
verify(subchannel, times(expectedTimes)).requestConnection();
verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
@ -184,7 +185,8 @@ public class RingHashLoadBalancerTest {
pickerCaptor.getValue().pickSubchannel(args);
Subchannel subchannel = subchannels.get(Collections.singletonList(childLbState.getEag()));
InOrder inOrder = Mockito.inOrder(helper, subchannel);
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2;
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs()
|| !PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 2 : 1;
inOrder.verify(subchannel, times(expectedTimes)).requestConnection();
deliverSubchannelState(subchannel, CSI_READY);
inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class));
@ -443,8 +445,10 @@ public class RingHashLoadBalancerTest {
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isTrue();
assertThat(result.getSubchannel()).isNull(); // buffer request
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2;
// verify kicked off connection to server2
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledHappyEyeballs()
|| !PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 2 : 1;
verify(getSubChannel(servers.get(1)), times(expectedTimes)).requestConnection();
assertThat(subchannels.size()).isEqualTo(2); // no excessive connection