diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index aaf7cea3c7..1ca62a91d2 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -954,6 +954,9 @@ public final class GrpcUtil { if (envVar == null) { envVar = System.getProperty(envVarName); } + if (envVar != null) { + envVar = envVar.trim(); + } if (enableByDefault) { return Strings.isNullOrEmpty(envVar) || Boolean.parseBoolean(envVar); } else { diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index 3d6fadeffd..47ea53c20d 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -131,8 +131,14 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { // If the previous ready subchannel exists in new address list, // keep this connection and don't create new subchannels SocketAddress previousAddress = addressIndex.getCurrentAddress(); + Attributes prevEagAttrs = addressIndex.getCurrentEagAttributes(); addressIndex.updateGroups(newImmutableAddressGroups); if (addressIndex.seekTo(previousAddress)) { + if (!addressIndex.getCurrentEagAttributes().equals(prevEagAttrs)) { + log.log(Level.FINE, "EAG attributes changed, need to update subchannel"); + SubchannelData subchannelData = subchannels.get(previousAddress); + subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList()); + } return Status.OK; } else { addressIndex.reset(); // Previous ready subchannel not in the new list of addresses @@ -354,7 +360,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { currentAddress = addressIndex.getCurrentAddress(); subchannel = subchannels.containsKey(currentAddress) ? subchannels.get(currentAddress).getSubchannel() - : createNewSubchannel(currentAddress); + : createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes()); ConnectivityState subchannelState = subchannels.get(currentAddress).getState(); switch (subchannelState) { @@ -418,12 +424,12 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { } } - private Subchannel createNewSubchannel(SocketAddress addr) { + private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) { HealthListener hcListener = new HealthListener(); final Subchannel subchannel = helper.createSubchannel( CreateSubchannelArgs.newBuilder() .setAddresses(Lists.newArrayList( - new EquivalentAddressGroup(addr))) + new EquivalentAddressGroup(addr, attrs))) .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener) .build()); if (subchannel == null) { @@ -433,8 +439,8 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener); hcListener.subchannelData = subchannelData; subchannels.put(addr, subchannelData); - Attributes attrs = subchannel.getAttributes(); - if (attrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) { + Attributes scAttrs = subchannel.getAttributes(); + if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) { hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY); } subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo)); @@ -584,6 +590,11 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer { return addressGroups.get(groupIndex).getAttributes(); } + public List getCurrentEagAsList() { + return Collections.singletonList( + new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes())); + } + /** * Update to new groups, resetting the current index. */ diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java index 27f25e78e1..69b49d2215 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java @@ -16,12 +16,13 @@ package io.grpc.internal; -import com.google.common.base.Strings; +import com.google.common.annotations.VisibleForTesting; import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.NameResolver; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; +import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; import io.grpc.internal.PickFirstLoadBalancer.PickFirstLoadBalancerConfig; import java.util.Map; @@ -35,8 +36,7 @@ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { private static final String SHUFFLE_ADDRESS_LIST_KEY = "shuffleAddressList"; static boolean enableNewPickFirst = - !Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST")) - && Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST")); + GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false); @Override public boolean isAvailable() { @@ -63,16 +63,28 @@ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { } @Override - public ConfigOrError parseLoadBalancingPolicyConfig( - Map rawLoadBalancingPolicyConfig) { + public ConfigOrError parseLoadBalancingPolicyConfig(Map rawLbPolicyConfig) { try { - return ConfigOrError.fromConfig( - new PickFirstLoadBalancerConfig(JsonUtil.getBoolean(rawLoadBalancingPolicyConfig, - SHUFFLE_ADDRESS_LIST_KEY))); + Object config = getLbPolicyConfig(rawLbPolicyConfig); + return ConfigOrError.fromConfig(config); } catch (RuntimeException e) { return ConfigOrError.fromError( Status.UNAVAILABLE.withCause(e).withDescription( "Failed parsing configuration for " + getPolicyName())); } } + + private static Object getLbPolicyConfig(Map rawLbPolicyConfig) { + Boolean shuffleAddressList = JsonUtil.getBoolean(rawLbPolicyConfig, SHUFFLE_ADDRESS_LIST_KEY); + if (enableNewPickFirst) { + return new PickFirstLeafLoadBalancerConfig(shuffleAddressList); + } else { + return new PickFirstLoadBalancerConfig(shuffleAddressList); + } + } + + @VisibleForTesting + public static boolean isEnabledNewPickFirst() { + return enableNewPickFirst; + } } diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 3bd98d39d3..871ee4e942 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -49,6 +49,7 @@ import io.grpc.LoadBalancerRegistry; import io.grpc.NameResolver.ConfigOrError; import io.grpc.Status; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; +import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; import io.grpc.internal.PickFirstLoadBalancer.PickFirstLoadBalancerConfig; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -95,6 +96,11 @@ public class AutoConfiguredLoadBalancerFactoryTest { delegatesTo( new FakeLoadBalancerProvider("test_lb2", testLbBalancer2, nextParsedConfigOrError2))); + private final Class pfLbClass = + PickFirstLoadBalancerProvider.enableNewPickFirst + ? PickFirstLeafLoadBalancer.class + : PickFirstLoadBalancer.class; + @Before public void setUp() { when(testLbBalancer.acceptResolvedAddresses(isA(ResolvedAddresses.class))).thenReturn( @@ -429,7 +435,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { .setLoadBalancingPolicyConfig(null) .build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); - assertThat(lb.getDelegate()).isInstanceOf(PickFirstLoadBalancer.class); + assertThat(lb.getDelegate()).isInstanceOf(pfLbClass); } @Test @@ -484,7 +490,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { verify(channelLogger).log( eq(ChannelLogLevel.INFO), eq("Load balancer changed from {0} to {1}"), - eq("PickFirstLoadBalancer"), + eq(pfLbClass.getSimpleName()), eq(testLbBalancer.getClass().getSimpleName())); verify(channelLogger).log( @@ -628,8 +634,15 @@ public class AutoConfiguredLoadBalancerFactoryTest { assertThat(parsed.getConfig()).isNotNull(); PolicySelection policySelection = (PolicySelection) parsed.getConfig(); assertThat(policySelection.provider).isInstanceOf(PickFirstLoadBalancerProvider.class); - assertThat(policySelection.config).isInstanceOf(PickFirstLoadBalancerConfig.class); - assertThat(((PickFirstLoadBalancerConfig) policySelection.config).shuffleAddressList).isTrue(); + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + assertThat(policySelection.config).isInstanceOf(PickFirstLeafLoadBalancerConfig.class); + assertThat(((PickFirstLeafLoadBalancerConfig) policySelection.config).shuffleAddressList) + .isTrue(); + } else { + assertThat(policySelection.config).isInstanceOf(PickFirstLoadBalancerConfig.class); + assertThat(((PickFirstLoadBalancerConfig) policySelection.config).shuffleAddressList) + .isTrue(); + } verifyNoInteractions(channelLogger); } diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerProviderTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerProviderTest.java index 3aa9b1872c..7844aebd3f 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerProviderTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerProviderTest.java @@ -19,6 +19,7 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import io.grpc.NameResolver.ConfigOrError; +import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig; import io.grpc.internal.PickFirstLoadBalancer.PickFirstLoadBalancerConfig; import java.util.HashMap; import java.util.Map; @@ -35,10 +36,23 @@ public class PickFirstLoadBalancerProviderTest { rawConfig.put("shuffleAddressList", true); ConfigOrError parsedConfig = new PickFirstLoadBalancerProvider().parseLoadBalancingPolicyConfig( rawConfig); - PickFirstLoadBalancerConfig config = (PickFirstLoadBalancerConfig) parsedConfig.getConfig(); - assertThat(config.shuffleAddressList).isTrue(); - assertThat(config.randomSeed).isNull(); + Boolean shuffleAddressList; + Long randomSeed; + + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + PickFirstLeafLoadBalancerConfig config = + (PickFirstLeafLoadBalancerConfig) parsedConfig.getConfig(); + shuffleAddressList = config.shuffleAddressList; + randomSeed = config.randomSeed; + } else { + PickFirstLoadBalancerConfig config = (PickFirstLoadBalancerConfig) parsedConfig.getConfig(); + shuffleAddressList = config.shuffleAddressList; + randomSeed = config.randomSeed; + } + + assertThat(shuffleAddressList).isTrue(); + assertThat(randomSeed).isNull(); } @Test @@ -46,9 +60,22 @@ public class PickFirstLoadBalancerProviderTest { Map rawConfig = new HashMap<>(); ConfigOrError parsedConfig = new PickFirstLoadBalancerProvider().parseLoadBalancingPolicyConfig( rawConfig); - PickFirstLoadBalancerConfig config = (PickFirstLoadBalancerConfig) parsedConfig.getConfig(); - assertThat(config.shuffleAddressList).isNull(); - assertThat(config.randomSeed).isNull(); + Boolean shuffleAddressList; + Long randomSeed; + + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + PickFirstLeafLoadBalancerConfig config = + (PickFirstLeafLoadBalancerConfig) parsedConfig.getConfig(); + shuffleAddressList = config.shuffleAddressList; + randomSeed = config.randomSeed; + } else { + PickFirstLoadBalancerConfig config = (PickFirstLoadBalancerConfig) parsedConfig.getConfig(); + shuffleAddressList = config.shuffleAddressList; + randomSeed = config.randomSeed; + } + + assertThat(shuffleAddressList).isNull(); + assertThat(randomSeed).isNull(); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index e8a857d884..b827bc7514 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -21,9 +21,9 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.base.Converter; @@ -106,8 +106,9 @@ public class RlsLoadBalancerTest { throw new RuntimeException(e); } }); + private final FakeHelper helperDelegate = new FakeHelper(); private final Helper helper = - mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper())); + mock(Helper.class, AdditionalAnswers.delegatesTo(helperDelegate)); private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(); private final Deque subchannels = new LinkedList<>(); private final FakeThrottler fakeThrottler = new FakeThrottler(); @@ -119,6 +120,8 @@ public class RlsLoadBalancerTest { private MethodDescriptor fakeRescueMethod; private RlsLoadBalancer rlsLb; private String defaultTarget = "defaultTarget"; + private PickSubchannelArgsImpl searchSubchannelArgs; + private PickSubchannelArgsImpl rescueSubchannelArgs; @Before public void setUp() { @@ -159,6 +162,13 @@ public class RlsLoadBalancerTest { .setTicker(fakeClock.getTicker()); } }; + + Metadata headers = new Metadata(); + searchSubchannelArgs = + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); + rescueSubchannelArgs = + new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT); + } @After @@ -176,13 +186,13 @@ public class RlsLoadBalancerTest { Metadata headers = new Metadata(); PickSubchannelArgsImpl fakeSearchMethodArgs = new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT); - PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); - FakeSubchannel subchannel = (FakeSubchannel) res.getSubchannel(); + picker.pickSubchannel(fakeSearchMethodArgs); // Will create the subchannel + FakeSubchannel subchannel = subchannels.peek(); assertThat(subchannel).isNotNull(); // Ensure happy path is unaffected subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - res = picker.pickSubchannel(fakeSearchMethodArgs); + PickResult res = picker.pickSubchannel(fakeSearchMethodArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); // Check on conversion @@ -203,34 +213,28 @@ public class RlsLoadBalancerTest { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); - Metadata headers = new Metadata(); - PickResult res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + PickResult res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper) + inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); - assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - assertThat(subchannels).hasSize(1); FakeSubchannel searchSubchannel = subchannels.getLast(); + assertThat(subchannelIsReady(searchSubchannel)).isFalse(); + searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); inOrder.verifyNoMoreInteractions(); + res = picker.pickSubchannel(searchSubchannelArgs); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); - assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes()); + assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel); // rescue should be pending status although the overall channel state is READY - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + res = picker.pickSubchannel(rescueSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); // other rls picker itself is ready due to first channel. - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); - inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); assertThat(subchannels).hasSize(2); @@ -238,7 +242,6 @@ public class RlsLoadBalancerTest { // search subchannel is down, rescue subchannel is connecting searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); - inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); @@ -248,8 +251,7 @@ public class RlsLoadBalancerTest { // search again, verify that it doesn't use fallback, since RLS server responded, even though // subchannel is in failure mode - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); } @@ -263,52 +265,41 @@ public class RlsLoadBalancerTest { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); - Metadata headers = new Metadata(); - PickResult res; // Search that when the RLS server doesn't respond, that fallback is used - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); - FakeSubchannel fallbackSubchannel = (FakeSubchannel) res.getSubchannel(); - assertThat(fallbackSubchannel).isNotNull(); - + PickResult res = picker.pickSubchannel(searchSubchannelArgs); // create subchannel assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); - assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); - inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - fallbackSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - inOrder.verify(helper, times(1)) - .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + FakeSubchannel fallbackSubchannel = + (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); + assertThat(fallbackSubchannel).isNotNull(); + assertThat(subchannelIsReady(fallbackSubchannel)).isTrue(); inOrder.verifyNoMoreInteractions(); - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); - assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel); + Subchannel subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel(); + assertThat(subchannelIsReady(subchannel)).isTrue(); + assertThat(subchannel).isSameInstanceAs(fallbackSubchannel); - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); - assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel); + subchannel = picker.pickSubchannel(searchSubchannelArgs).getSubchannel(); + assertThat(subchannelIsReady(subchannel)).isTrue(); + assertThat(subchannel).isSameInstanceAs(fallbackSubchannel); // Make sure that when RLS starts communicating that default stops being used fakeThrottler.nextResult = false; fakeClock.forwardTime(2, TimeUnit.SECONDS); // Expires backoff cache entries - // Create search subchannel - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); - assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel); - FakeSubchannel searchSubchannel = (FakeSubchannel) res.getSubchannel(); + + picker.pickSubchannel(searchSubchannelArgs);// Create search subchannel + FakeSubchannel searchSubchannel = + (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); assertThat(searchSubchannel).isNotNull(); - searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(searchSubchannel).isNotSameInstanceAs(fallbackSubchannel); // create rescue subchannel - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); - assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel); - assertThat(res.getSubchannel()).isNotSameInstanceAs(searchSubchannel); - FakeSubchannel rescueSubchannel = (FakeSubchannel) res.getSubchannel(); + picker.pickSubchannel(rescueSubchannelArgs); + FakeSubchannel rescueSubchannel = + (FakeSubchannel) markReadyAndGetPickResult(inOrder, rescueSubchannelArgs).getSubchannel(); assertThat(rescueSubchannel).isNotNull(); - rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + assertThat(rescueSubchannel).isNotSameInstanceAs(fallbackSubchannel); + assertThat(rescueSubchannel).isNotSameInstanceAs(searchSubchannel); // all channels are failed rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); @@ -316,7 +307,7 @@ public class RlsLoadBalancerTest { fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + searchSubchannelArgs); assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE); assertThat(res.getSubchannel()).isNull(); } @@ -330,37 +321,29 @@ public class RlsLoadBalancerTest { .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); Metadata headers = new Metadata(); - PickResult res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + PickResult res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper) + inOrder.verify(helper, atLeast(0)) .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); assertThat(subchannels).hasSize(1); - FakeSubchannel searchSubchannel = subchannels.getLast(); - searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + FakeSubchannel searchSubchannel = + (FakeSubchannel) markReadyAndGetPickResult(inOrder, searchSubchannelArgs).getSubchannel(); inOrder.verifyNoMoreInteractions(); - assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); - assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes()); + assertThat(subchannelIsReady(searchSubchannel)).isTrue(); + assertThat(subchannels.getLast()).isSameInstanceAs(searchSubchannel); // rescue should be pending status although the overall channel state is READY picker = pickerCaptor.getValue(); - res = picker.pickSubchannel( - new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + res = picker.pickSubchannel(rescueSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); // other rls picker itself is ready due to first channel. - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); - inOrder.verifyNoMoreInteractions(); assertThat(res.getStatus().isOk()).isTrue(); - assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); assertThat(subchannels).hasSize(2); FakeSubchannel rescueSubchannel = subchannels.getLast(); + assertThat(subchannelIsReady(rescueSubchannel)).isFalse(); // search subchannel is down, rescue subchannel is still connecting searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); @@ -388,6 +371,7 @@ public class RlsLoadBalancerTest { rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verify(helper, atLeast(0)).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); } @@ -406,10 +390,7 @@ public class RlsLoadBalancerTest { assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); assertThat(subchannels).hasSize(1); - inOrder.verifyNoMoreInteractions(); FakeSubchannel searchSubchannel = subchannels.getLast(); searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); @@ -438,6 +419,16 @@ public class RlsLoadBalancerTest { assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); } + private PickResult markReadyAndGetPickResult(InOrder inOrder, + PickSubchannelArgsImpl pickSubchannelArgs) { + subchannels.getLast().updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + inOrder.verify(helper, atLeast(1)) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + PickResult pickResult = pickerCaptor.getValue().pickSubchannel(pickSubchannelArgs); + inOrder.verify(helper, atLeast(0)).getChannelLogger(); + return pickResult; + } + private void deliverResolvedAddresses() throws Exception { ConfigOrError parsedConfigOrError = provider.parseLoadBalancingPolicyConfig(getServiceConfig()); diff --git a/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java b/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java index d90c5eab92..d071d97a36 100644 --- a/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/MultiChildLoadBalancerTest.java @@ -41,6 +41,7 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.Status; +import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.util.AbstractTestHelper.FakeSocketAddress; import io.grpc.util.MultiChildLoadBalancer.ChildLbState; import io.grpc.util.MultiChildLoadBalancer.Endpoint; @@ -81,8 +82,8 @@ public class MultiChildLoadBalancerTest { private ArgumentCaptor stateCaptor; @Captor private ArgumentCaptor createArgsCaptor; - private TestHelper testHelperInst = new TestHelper(); - private LoadBalancer.Helper mockHelper = + private final TestHelper testHelperInst = new TestHelper(); + private final LoadBalancer.Helper mockHelper = mock(LoadBalancer.Helper.class, delegatesTo(testHelperInst)); private TestLb loadBalancer; @@ -99,7 +100,7 @@ public class MultiChildLoadBalancerTest { } @Test - public void pickAfterResolved() throws Exception { + public void pickAfterResolved() { Status addressesAcceptanceStatus = loadBalancer.acceptResolvedAddresses( LoadBalancer.ResolvedAddresses.newBuilder().setAddresses(servers).build()); assertThat(addressesAcceptanceStatus.isOk()).isTrue(); @@ -131,7 +132,7 @@ public class MultiChildLoadBalancerTest { } @Test - public void pickAfterResolvedUpdatedHosts() throws Exception { + public void pickAfterResolvedUpdatedHosts() { Attributes.Key key = Attributes.Key.create("check-that-it-is-propagated"); FakeSocketAddress removedAddr = new FakeSocketAddress("removed"); EquivalentAddressGroup removedEag = new EquivalentAddressGroup(removedAddr); @@ -195,7 +196,7 @@ public class MultiChildLoadBalancerTest { } @Test - public void pickFromMultiAddressEags() throws Exception { + public void pickFromMultiAddressEags() { List addressList1 = new ArrayList<>(); List addressList2 = new ArrayList<>(); for (int i = 0; i < 3; i++) { @@ -215,7 +216,7 @@ public class MultiChildLoadBalancerTest { LoadBalancer.ResolvedAddresses.newBuilder().setAddresses(multiGroups).build()); assertTrue(addressesAcceptanceStatus.isOk()); - LoadBalancer.Subchannel evens = subchannels.get(Collections.singletonList(eag1)); + LoadBalancer.Subchannel evens = getSubchannel(eag1); deliverSubchannelState(evens, ConnectivityStateInfo.forNonError(READY)); verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(pickerCaptor.getValue()).isInstanceOf(TestLb.TestSubchannelPicker.class); @@ -321,8 +322,20 @@ public class MultiChildLoadBalancerTest { return new Endpoint(eag); } - private LoadBalancer.Subchannel getSubchannel(EquivalentAddressGroup removedEag) { - return subchannels.get(Collections.singletonList(removedEag)); + private LoadBalancer.Subchannel getSubchannel(EquivalentAddressGroup eag) { + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + for (SocketAddress addr : eag.getAddresses()) { + LoadBalancer.Subchannel subchannel = subchannels.get( + Arrays.asList(new EquivalentAddressGroup(addr, eag.getAttributes()))); + if (subchannel != null) { + return subchannel; + } + } + } else { + return subchannels.get(Collections.singletonList(eag)); + } + + return null; } private static List getChildEags(MultiChildLoadBalancer loadBalancer) { diff --git a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java index 87bd50a58b..f8b700dca2 100644 --- a/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/OutlierDetectionLoadBalancerTest.java @@ -53,6 +53,7 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock.ScheduledTask; +import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.internal.TestUtils.StandardLoadBalancerProvider; import io.grpc.util.OutlierDetectionLoadBalancer.EndpointTracker; @@ -409,6 +410,9 @@ public class OutlierDetectionLoadBalancerTest { SubchannelPicker picker = pickerCaptor.getAllValues().get(2); PickResult pickResult = picker.pickSubchannel(mock(PickSubchannelArgs.class)); Subchannel s = ((OutlierDetectionSubchannel) pickResult.getSubchannel()).delegate(); + if (s instanceof HealthProducerHelper.HealthProducerSubchannel) { + s = ((HealthProducerHelper.HealthProducerSubchannel) s).delegate(); + } assertThat(s).isEqualTo(readySubchannel); } @@ -564,7 +568,9 @@ public class OutlierDetectionLoadBalancerTest { loadBalancer.acceptResolvedAddresses(buildResolvedAddress(config, servers)); - generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), 12); + // The PickFirstLeafLB has an extra level of indirection because of health + int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 16 : 12; + generateLoad(ImmutableMap.of(subchannel2, Status.DEADLINE_EXCEEDED), expectedStateChanges); // Move forward in time to a point where the detection timer has fired. forwardTime(config); @@ -597,8 +603,9 @@ public class OutlierDetectionLoadBalancerTest { // The one subchannel that was returning errors should be ejected. assertEjectedSubchannels(ImmutableSet.of(ImmutableSet.copyOf(servers.get(0).getAddresses()))); - // Now we produce more load, but the subchannel start working and is no longer an outlier. - generateLoad(ImmutableMap.of(), 12); + // Now we produce more load, but the subchannel has started working and is no longer an outlier. + int expectedStateChanges = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 16 : 12; + generateLoad(ImmutableMap.of(), expectedStateChanges); // Move forward in time to a point where the detection timer has fired. fakeClock.forwardTime(config.maxEjectionTimeNanos + 1, TimeUnit.NANOSECONDS); diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 6b935ec3aa..e061a4d0b6 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -30,6 +30,8 @@ import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -244,7 +246,7 @@ public class RoundRobinLoadBalancerTest { // TODO figure out if this method testing the right things ChildLbState childLbState = loadBalancer.getChildLbStates().iterator().next(); - Subchannel subchannel = childLbState.getCurrentPicker().pickSubchannel(null).getSubchannel(); + Subchannel subchannel = subchannels.get(Arrays.asList(childLbState.getEag())); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); assertThat(childLbState.getCurrentState()).isEqualTo(CONNECTING); @@ -258,16 +260,14 @@ public class RoundRobinLoadBalancerTest { deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); - inOrder.verify(mockHelper).refreshNameResolution(); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + AbstractTestHelper.refreshInvokedAndUpdateBS(inOrder, CONNECTING, mockHelper, pickerCaptor); assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); - deliverSubchannelState(subchannel, - ConnectivityStateInfo.forNonError(IDLE)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(mockHelper).refreshNameResolution(); assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); - verify(subchannel, times(2)).requestConnection(); + verify(subchannel, atLeastOnce()).requestConnection(); verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verifyNoMoreInteractions(mockHelper); } @@ -302,20 +302,20 @@ public class RoundRobinLoadBalancerTest { Map childToSubChannelMap = new HashMap<>(); // Simulate state transitions for each subchannel individually. for ( ChildLbState child : loadBalancer.getChildLbStates()) { - Subchannel sc = child.getSubchannels(mockArgs); + Subchannel sc = subchannels.get(Arrays.asList(child.getEag())); childToSubChannelMap.put(child, sc); Status error = Status.UNKNOWN.withDescription("connection broken"); deliverSubchannelState( sc, ConnectivityStateInfo.forTransientFailure(error)); assertEquals(TRANSIENT_FAILURE, child.getCurrentState()); - inOrder.verify(mockHelper).refreshNameResolution(); deliverSubchannelState( sc, ConnectivityStateInfo.forNonError(CONNECTING)); assertEquals(TRANSIENT_FAILURE, child.getCurrentState()); } inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), isA(ReadyPicker.class)); + inOrder.verify(mockHelper, atLeast(0)).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); ChildLbState child = loadBalancer.getChildLbStates().iterator().next(); @@ -325,7 +325,8 @@ public class RoundRobinLoadBalancerTest { inOrder.verify(mockHelper).updateBalancingState(eq(READY), isA(ReadyPicker.class)); verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - verifyNoMoreInteractions(mockHelper); + inOrder.verify(mockHelper, atLeast(0)).refreshNameResolution(); + inOrder.verifyNoMoreInteractions(); } @Test @@ -339,7 +340,7 @@ public class RoundRobinLoadBalancerTest { // Simulate state transitions for each subchannel individually. for (ChildLbState child : loadBalancer.getChildLbStates()) { - Subchannel sc = child.getSubchannels(mockArgs); + Subchannel sc = subchannels.get(Arrays.asList(child.getEag())); verify(sc).requestConnection(); deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(CONNECTING)); Status error = Status.UNKNOWN.withDescription("connection broken"); diff --git a/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java b/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java index d8e75e939f..9c71ee98a9 100644 --- a/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java +++ b/util/src/testFixtures/java/io/grpc/util/AbstractTestHelper.java @@ -17,6 +17,7 @@ package io.grpc.util; import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import com.google.common.collect.Maps; @@ -31,11 +32,14 @@ import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.internal.PickFirstLoadBalancerProvider; import java.net.SocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; /** * A real class that can be used as a delegate of a mock Helper to provide more real representation @@ -129,6 +133,22 @@ public abstract class AbstractTestHelper extends ForwardingLoadBalancerHelper { return "Test Helper"; } + public static void refreshInvokedAndUpdateBS(InOrder inOrder, ConnectivityState state, + Helper helper, + ArgumentCaptor pickerCaptor) { + // Old PF and new PF reverse calling order of updateBlaancingState and refreshNameResolution + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), pickerCaptor.capture()); + } + + inOrder.verify(helper).refreshNameResolution(); + + if (!PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), pickerCaptor.capture()); + } + } + + protected class TestSubchannel extends ForwardingSubchannel { CreateSubchannelArgs args; Channel channel; diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 76bac3118d..1316b22c52 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -530,16 +530,17 @@ final class RingHashLoadBalancer extends MultiChildLoadBalancer { @Override public void updateBalancingState(final ConnectivityState newState, final SubchannelPicker newPicker) { + // Subchannel picker and state are saved, but will only be propagated to the channel + // when the child instance exits deactivated state. + setCurrentState(newState); + setCurrentPicker(newPicker); + // If we are already in the process of resolving addresses, the overall balancing state // will be updated at the end of it, and we don't need to trigger that update here. if (getChildLbState(getKey()) == null) { return; } - // Subchannel picker and state are saved, but will only be propagated to the channel - // when the child instance exits deactivated state. - setCurrentState(newState); - setCurrentPicker(newPicker); if (!isDeactivated() && !resolvingAddresses) { updateOverallBalancingState(); } diff --git a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java index 9afe82d04e..47fd5a84e4 100644 --- a/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/LeastRequestLoadBalancerTest.java @@ -31,6 +31,7 @@ import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -58,6 +59,7 @@ import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelStateListener; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.util.AbstractTestHelper; import io.grpc.util.MultiChildLoadBalancer.ChildLbState; import io.grpc.xds.LeastRequestLoadBalancer.EmptyPicker; @@ -266,28 +268,25 @@ public class LeastRequestLoadBalancerTest { inOrder.verify(helper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); assertThat(childLbState.getCurrentState()).isEqualTo(CONNECTING); - deliverSubchannelState(subchannel, - ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class); assertThat(childLbState.getCurrentState()).isEqualTo(READY); Status error = Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯"); - deliverSubchannelState(subchannel, - ConnectivityStateInfo.forTransientFailure(error)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); assertThat(childLbState.getCurrentPicker().toString()).contains(error.toString()); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + refreshInvokedAndUpdateBS(inOrder, CONNECTING); assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); - deliverSubchannelState(subchannel, - ConnectivityStateInfo.forNonError(IDLE)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).refreshNameResolution(); assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); assertThat(childLbState.getCurrentPicker().toString()).contains(error.toString()); - verify(subchannel, times(2)).requestConnection(); + int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; + verify(subchannel, times(expectedCount)).requestConnection(); verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); verifyNoMoreInteractions(helper); } @@ -358,14 +357,15 @@ public class LeastRequestLoadBalancerTest { Subchannel sc = getSubchannel(childLbState); Status error = Status.UNKNOWN.withDescription("connection broken"); deliverSubchannelState(sc, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).refreshNameResolution(); deliverSubchannelState(sc, ConnectivityStateInfo.forNonError(CONNECTING)); assertThat(childLbState.getCurrentState()).isEqualTo(TRANSIENT_FAILURE); } - inOrder.verify(helper) - .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + verify(helper, atLeast(loadBalancer.getChildLbStates().size())).refreshNameResolution(); + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); assertThat(getStatusString(pickerCaptor.getValue())) .contains("Status{code=UNKNOWN, description=connection broken"); + inOrder.verify(helper, atLeast(0)).refreshNameResolution(); inOrder.verifyNoMoreInteractions(); ChildLbState childLbState = loadBalancer.getChildLbStates().iterator().next(); @@ -660,6 +660,19 @@ public class LeastRequestLoadBalancerTest { testHelperInstance.deliverSubchannelState(subchannel, newState); } + // Old PF and new PF reverse calling order of updateBlaancingState and refreshNameResolution + private void refreshInvokedAndUpdateBS(InOrder inOrder, ConnectivityState state) { + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), pickerCaptor.capture()); + } + + inOrder.verify(helper).refreshNameResolution(); + + if (!PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), pickerCaptor.capture()); + } + } + private static class FakeSocketAddress extends SocketAddress { final String name; diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index 9d88998fe7..19383df72b 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; @@ -30,6 +31,7 @@ import static io.grpc.xds.RingHashLoadBalancerTest.InitializationFlags.STAY_IN_C import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.mock; @@ -43,6 +45,7 @@ import com.google.common.collect.Iterables; import com.google.common.primitives.UnsignedInteger; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer.CreateSubchannelArgs; @@ -56,6 +59,7 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.SynchronizationContext; +import io.grpc.internal.PickFirstLoadBalancerProvider; import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.testing.TestMethodDescriptors; import io.grpc.util.AbstractTestHelper; @@ -89,6 +93,9 @@ import org.mockito.junit.MockitoRule; public class RingHashLoadBalancerTest { private static final String AUTHORITY = "foo.googleapis.com"; private static final Attributes.Key CUSTOM_KEY = Attributes.Key.create("custom-key"); + private static final ConnectivityStateInfo CSI_CONNECTING = + ConnectivityStateInfo.forNonError(CONNECTING); + public static final ConnectivityStateInfo CSI_READY = ConnectivityStateInfo.forNonError(READY); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @@ -145,11 +152,12 @@ public class RingHashLoadBalancerTest { verify(subchannel).requestConnection(); verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - verify(helper, times(2)).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + deliverSubchannelState(subchannel, CSI_CONNECTING); + int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2; + verify(helper, times(expectedCount)).updateBalancingState(eq(CONNECTING), any()); // Subchannel becomes ready, triggers pick again. - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, CSI_READY); verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getSubchannel()).isSameInstanceAs(subchannel); @@ -174,11 +182,13 @@ public class RingHashLoadBalancerTest { PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); pickerCaptor.getValue().pickSubchannel(args); assertThat(childLbState.isDeactivated()).isFalse(); - assertThat(childLbState.getLb().delegateType()).isEqualTo("PickFirstLoadBalancer"); + String expectedLbType = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() + ? "PickFirstLeafLoadBalancer" : "PickFirstLoadBalancer"; + assertThat(childLbState.getLb().delegateType()).isEqualTo(expectedLbType); Subchannel subchannel = subchannels.get(Collections.singletonList(childLbState.getEag())); InOrder inOrder = Mockito.inOrder(helper, subchannel); inOrder.verify(subchannel).requestConnection(); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); @@ -198,50 +208,51 @@ public class RingHashLoadBalancerTest { initializeLbSubchannels(config, servers); // one in CONNECTING, one in IDLE - deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(0))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(getSubchannel(servers, 0), CSI_CONNECTING); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verifyConnection(0); // two in CONNECTING - deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); verifyConnection(0); // one in CONNECTING, one in READY - deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(getSubchannel(servers, 1), CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); verifyConnection(0); // one in TRANSIENT_FAILURE, one in READY deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(0))), + getSubchannel(servers, 0), ConnectivityStateInfo.forTransientFailure( Status.UNKNOWN.withDescription("unknown failure"))); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(READY), any()); + } else { + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper).updateBalancingState(eq(READY), any()); + } verifyConnection(0); // one in TRANSIENT_FAILURE, one in IDLE deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), + getSubchannel(servers, 1), ConnectivityStateInfo.forNonError(IDLE)); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any()); + } else { + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any()); + } verifyConnection(0); - - verifyNoMoreInteractions(helper); } private void verifyConnection(int times) { for (int i = 0; i < times; i++) { Subchannel connectOnce = connectionRequestedQueue.poll(); - assertThat(connectOnce).isNotNull(); + assertWithMessage("Null connection is at (%s) of (%s)", i, times) + .that(connectOnce).isNotNull(); clearInvocations(connectOnce); } assertThat(connectionRequestedQueue.poll()).isNull(); @@ -261,37 +272,48 @@ public class RingHashLoadBalancerTest { // one in TRANSIENT_FAILURE, three in CONNECTING deliverNotFound(subChannelList, 0); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); + refreshInvokedButNotUpdateBS(inOrder, TRANSIENT_FAILURE); // two in TRANSIENT_FAILURE, two in CONNECTING deliverNotFound(subChannelList, 1); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper) - .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + refreshInvokedAndUpdateBS(inOrder, TRANSIENT_FAILURE); // All 4 in TF switch to TF deliverNotFound(subChannelList, 2); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper) - .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + refreshInvokedAndUpdateBS(inOrder, TRANSIENT_FAILURE); deliverNotFound(subChannelList, 3); - inOrder.verify(helper).refreshNameResolution(); - inOrder.verify(helper) - .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); + refreshInvokedAndUpdateBS(inOrder, TRANSIENT_FAILURE); // reset subchannel to CONNECTING - shouldn't change anything since PF hides the state change - deliverSubchannelState(subChannelList.get(2), ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subChannelList.get(2), CSI_CONNECTING); inOrder.verify(helper, never()) .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); inOrder.verify(subChannelList.get(2), never()).requestConnection(); // three in TRANSIENT_FAILURE, one in READY - deliverSubchannelState(subChannelList.get(2), ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subChannelList.get(2), CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); inOrder.verify(subChannelList.get(2), never()).requestConnection(); + } - verifyNoMoreInteractions(helper); + // Old PF and new PF reverse calling order of updateBlaancingState and refreshNameResolution + private void refreshInvokedButNotUpdateBS(InOrder inOrder, ConnectivityState state) { + inOrder.verify(helper, never()).updateBalancingState(eq(state), any(SubchannelPicker.class)); + inOrder.verify(helper).refreshNameResolution(); + inOrder.verify(helper, never()).updateBalancingState(eq(state), any(SubchannelPicker.class)); + } + + // Old PF and new PF reverse calling order of updateBlaancingState and refreshNameResolution + private void refreshInvokedAndUpdateBS(InOrder inOrder, ConnectivityState state) { + if (PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), any()); + } + + inOrder.verify(helper).refreshNameResolution(); + + if (!PickFirstLoadBalancerProvider.isEnabledNewPickFirst()) { + inOrder.verify(helper).updateBalancingState(eq(state), any()); + } } @Test @@ -319,7 +341,7 @@ public class RingHashLoadBalancerTest { // Bring all subchannels to READY so that next pick always succeeds. for (Subchannel subchannel : subchannels.values()) { - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); } @@ -336,8 +358,8 @@ public class RingHashLoadBalancerTest { Attributes attr = addr.getAttributes().toBuilder().set(CUSTOM_KEY, "custom value").build(); updatedServers.add(new EquivalentAddressGroup(addr.getAddresses(), attr)); } - Subchannel subchannel0_old = subchannels.get(Collections.singletonList(servers.get(0))); - Subchannel subchannel1_old = subchannels.get(Collections.singletonList(servers.get(1))); + Subchannel subchannel0_old = getSubchannel(servers, 0); + Subchannel subchannel1_old = getSubchannel(servers, 1); Status addressesAcceptanceStatus = loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(updatedServers).setLoadBalancingPolicyConfig(config).build()); @@ -360,7 +382,7 @@ public class RingHashLoadBalancerTest { // Bring all subchannels to READY so that next pick always succeeds. for (Subchannel subchannel : subchannels.values()) { - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, CSI_READY); inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); } @@ -425,16 +447,15 @@ public class RingHashLoadBalancerTest { verify(getSubChannel(servers.get(1))).requestConnection(); // kicked off connection to server2 assertThat(subchannels.size()).isEqualTo(2); // no excessive connection - reset(helper); - deliverSubchannelState(getSubChannel(servers.get(1)), - ConnectivityStateInfo.forNonError(CONNECTING)); - verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + deliverSubchannelState(getSubChannel(servers.get(1)), CSI_CONNECTING); + verify(helper, atLeast(1)) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request - deliverSubchannelState(getSubChannel(servers.get(1)), ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(getSubChannel(servers.get(1)), CSI_READY); verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); result = pickerCaptor.getValue().pickSubchannel(args); @@ -471,21 +492,22 @@ public class RingHashLoadBalancerTest { // Bring down server0 and server2 to force trying server1. deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), + getSubchannel(servers, 1), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(2))), + getSubchannel(servers, 2), ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied"))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verifyConnection(0); PickResult result = pickerCaptor.getValue().pickSubchannel(args); // activate last subchannel assertThat(result.getStatus().isOk()).isTrue(); - verifyConnection(1); + int expectedCount = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1; + verifyConnection(expectedCount); deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(0))), + getSubchannel(servers, 0), ConnectivityStateInfo.forTransientFailure( Status.PERMISSION_DENIED.withDescription("permission denied again"))); verify(helper, times(2)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); @@ -496,9 +518,7 @@ public class RingHashLoadBalancerTest { assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); // Now connecting to server1. - deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING); reset(helper); @@ -509,9 +529,7 @@ public class RingHashLoadBalancerTest { assertThat(result.getStatus().getDescription()).isEqualTo("unreachable"); // Simulate server1 becomes READY. - deliverSubchannelState( - subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(getSubchannel(servers, 1), CSI_READY); verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); @@ -574,7 +592,7 @@ public class RingHashLoadBalancerTest { initializeLbSubchannels(config, servers); // Go to TF does nothing, though PF will try to reconnect after backoff - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), + deliverSubchannelState(getSubchannel(servers, 1), ConnectivityStateInfo.forTransientFailure( Status.UNAVAILABLE.withDescription("unreachable"))); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); @@ -594,22 +612,21 @@ public class RingHashLoadBalancerTest { List servers = createWeightedServerAddrs(1, 1, 1); initializeLbSubchannels(config, servers); - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(0))), - ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(getSubchannel(servers, 0), CSI_CONNECTING); + deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING); verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); // Picking subchannel triggers connection. PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); - verify(subchannels.get(Collections.singletonList(servers.get(0))), never()) - .requestConnection(); - verify(subchannels.get(Collections.singletonList(servers.get(1))), never()) - .requestConnection(); - verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) - .requestConnection(); + verify(getSubchannel(servers, 0), never()).requestConnection(); + verify(getSubchannel(servers, 1), never()).requestConnection(); + verify(getSubchannel(servers, 2), never()).requestConnection(); + } + + private Subchannel getSubchannel(List servers, int serverIndex) { + return subchannels.get(Collections.singletonList(servers.get(serverIndex))); } @Test @@ -656,17 +673,16 @@ public class RingHashLoadBalancerTest { // "FakeSocketAddress-server0_0" // "FakeSocketAddress-server2_0" - Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + Subchannel firstSubchannel = getSubchannel(servers, 0); deliverSubchannelUnreachable(firstSubchannel); verifyConnection(0); - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(getSubchannel(servers, 2), CSI_CONNECTING); verify(helper, times(2)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verifyConnection(0); // Picking subchannel when idle triggers connection. - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(2))), + deliverSubchannelState(getSubchannel(servers, 2), ConnectivityStateInfo.forNonError(IDLE)); verifyConnection(0); PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); @@ -688,9 +704,9 @@ public class RingHashLoadBalancerTest { // "FakeSocketAddress-server0_0" // "FakeSocketAddress-server2_0" - Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + Subchannel firstSubchannel = getSubchannel(servers, 0); deliverSubchannelUnreachable(firstSubchannel); - deliverSubchannelUnreachable(subchannels.get(Collections.singletonList(servers.get(2)))); + deliverSubchannelUnreachable(getSubchannel(servers, 2)); verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); verifyConnection(0); @@ -698,7 +714,7 @@ public class RingHashLoadBalancerTest { PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); - verify(subchannels.get(Collections.singletonList(servers.get(1)))).requestConnection(); + verify(getSubchannel(servers, 1)).requestConnection(); verifyConnection(1); } @@ -715,12 +731,11 @@ public class RingHashLoadBalancerTest { // "FakeSocketAddress-server0_0" // "FakeSocketAddress-server2_0" - Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + Subchannel firstSubchannel = getSubchannel(servers, 0); deliverSubchannelUnreachable(firstSubchannel); - deliverSubchannelUnreachable(subchannels.get(Collections.singletonList(servers.get(2)))); - deliverSubchannelState(subchannels.get(Collections.singletonList(servers.get(1))), - ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelUnreachable(getSubchannel(servers, 2)); + deliverSubchannelState(getSubchannel(servers, 1), CSI_CONNECTING); verify(helper, atLeastOnce()) .updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); verifyConnection(0); @@ -741,7 +756,7 @@ public class RingHashLoadBalancerTest { initializeLbSubchannels(config, servers); // Bring one subchannel to TRANSIENT_FAILURE. - Subchannel firstSubchannel = subchannels.get(Collections.singletonList(servers.get(0))); + Subchannel firstSubchannel = getSubchannel(servers, 0); deliverSubchannelUnreachable(firstSubchannel); verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); @@ -752,15 +767,14 @@ public class RingHashLoadBalancerTest { // Should not have called updateBalancingState on the helper again because PickFirst is // shielding the higher level from the state change. verify(helper, never()).updateBalancingState(any(), any()); - verifyConnection(1); + verifyConnection(PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 0 : 1); // Picking subchannel triggers connection on second address. RPC hash hits server0. PickSubchannelArgs args = getDefaultPickSubchannelArgs(hashFunc.hashVoid()); PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); - verify(subchannels.get(Collections.singletonList(servers.get(1)))).requestConnection(); - verify(subchannels.get(Collections.singletonList(servers.get(2))), never()) - .requestConnection(); + verify(getSubchannel(servers, 1)).requestConnection(); + verify(getSubchannel(servers, 2), never()).requestConnection(); } @Test @@ -811,7 +825,7 @@ public class RingHashLoadBalancerTest { // Bring all subchannels to READY. Map pickCounts = new HashMap<>(); for (Subchannel subchannel : subchannels.values()) { - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + deliverSubchannelState(subchannel, CSI_READY); pickCounts.put(subchannel.getAddresses(), 0); } verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture()); @@ -858,7 +872,7 @@ public class RingHashLoadBalancerTest { pickerCaptor.getValue().pickSubchannel(args); verify(helper, never()).updateBalancingState(eq(READY), any(SubchannelPicker.class)); deliverSubchannelState( - Iterables.getOnlyElement(subchannels.values()), ConnectivityStateInfo.forNonError(READY)); + Iterables.getOnlyElement(subchannels.values()), CSI_READY); verify(helper).updateBalancingState(eq(READY), any(SubchannelPicker.class)); reset(helper);