diff --git a/grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java b/grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java index b033fef388..b70b5d5907 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java +++ b/grpclb/src/main/java/io/grpc/grpclb/CachedSubchannelPool.java @@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; +import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -37,31 +39,51 @@ final class CachedSubchannelPool implements SubchannelPool { new HashMap<>(); private Helper helper; + private LoadBalancer lb; @VisibleForTesting static final long SHUTDOWN_TIMEOUT_MS = 10000; @Override - public void init(Helper helper) { + public void init(Helper helper, LoadBalancer lb) { this.helper = checkNotNull(helper, "helper"); + this.lb = checkNotNull(lb, "lb"); } @Override public Subchannel takeOrCreateSubchannel( EquivalentAddressGroup eag, Attributes defaultAttributes) { - CacheEntry entry = cache.remove(eag); - Subchannel subchannel; + final CacheEntry entry = cache.remove(eag); + final Subchannel subchannel; if (entry == null) { subchannel = helper.createSubchannel(eag, defaultAttributes); } else { subchannel = entry.subchannel; entry.shutdownTimer.cancel(); + // Make the balancer up-to-date with the latest state in case it has changed while it's + // in the cache. + helper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + lb.handleSubchannelState(subchannel, entry.state); + } + }); } return subchannel; } @Override - public void returnSubchannel(Subchannel subchannel) { + public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo) { + CacheEntry cached = cache.get(subchannel.getAddresses()); + if (cached == null || cached.subchannel != subchannel) { + // Given subchannel is not cached. Not our responsibility. + return; + } + cached.state = newStateInfo; + } + + @Override + public void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState) { CacheEntry prev = cache.get(subchannel.getAddresses()); if (prev != null) { // Returning the same Subchannel twice has no effect. @@ -77,7 +99,7 @@ final class CachedSubchannelPool implements SubchannelPool { helper.getSynchronizationContext().schedule( shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS, helper.getScheduledExecutorService()); - CacheEntry entry = new CacheEntry(subchannel, shutdownTimer); + CacheEntry entry = new CacheEntry(subchannel, shutdownTimer, lastKnownState); cache.put(subchannel.getAddresses(), entry); } @@ -110,10 +132,12 @@ final class CachedSubchannelPool implements SubchannelPool { private static class CacheEntry { final Subchannel subchannel; final ScheduledHandle shutdownTimer; + ConnectivityStateInfo state; - CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer) { + CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer, ConnectivityStateInfo state) { this.subchannel = checkNotNull(subchannel, "subchannel"); this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer"); + this.state = checkNotNull(state, "state"); } } } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index b0719f0625..181d05657a 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -71,7 +71,7 @@ class GrpclbLoadBalancer extends LoadBalancer { this.time = checkNotNull(time, "time provider"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); - this.subchannelPool.init(helper); + this.subchannelPool.init(helper, this); recreateStates(); checkNotNull(grpclbState, "grpclbState"); } diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 4d87f32d24..bc647504a5 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -159,7 +159,8 @@ final class GrpclbState { this.mode = checkNotNull(mode, "mode"); this.helper = checkNotNull(helper, "helper"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); - this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); + this.subchannelPool = + mode == Mode.ROUND_ROBIN ? checkNotNull(subchannelPool, "subchannelPool") : null; this.time = checkNotNull(time, "time provider"); this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); @@ -168,7 +169,13 @@ final class GrpclbState { } void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { - if (newState.getState() == SHUTDOWN || !subchannels.values().contains(subchannel)) { + if (newState.getState() == SHUTDOWN) { + return; + } + if (!subchannels.values().contains(subchannel)) { + if (subchannelPool != null ) { + subchannelPool.handleSubchannelState(subchannel, newState); + } return; } if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) { @@ -311,8 +318,9 @@ final class GrpclbState { // We close the subchannels through subchannelPool instead of helper just for convenience of // testing. for (Subchannel subchannel : subchannels.values()) { - subchannelPool.returnSubchannel(subchannel); + returnSubchannelToPool(subchannel); } + subchannelPool.clear(); break; case PICK_FIRST: checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); @@ -322,7 +330,6 @@ final class GrpclbState { throw new AssertionError("Missing case for " + mode); } subchannels = Collections.emptyMap(); - subchannelPool.clear(); cancelFallbackTimer(); cancelLbRpcRetryTimer(); } @@ -335,6 +342,10 @@ final class GrpclbState { } } + private void returnSubchannelToPool(Subchannel subchannel) { + subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get()); + } + @VisibleForTesting @Nullable GrpclbClientLoadRecorder getLoadRecorder() { @@ -383,7 +394,7 @@ final class GrpclbState { for (Entry, Subchannel> entry : subchannels.entrySet()) { List eagList = entry.getKey(); if (!newSubchannelMap.containsKey(eagList)) { - subchannelPool.returnSubchannel(entry.getValue()); + returnSubchannelToPool(entry.getValue()); } } subchannels = Collections.unmodifiableMap(newSubchannelMap); diff --git a/grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java b/grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java index 5cc6769156..0d328fdb09 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java +++ b/grpclb/src/main/java/io/grpc/grpclb/SubchannelPool.java @@ -17,7 +17,9 @@ package io.grpc.grpclb; import io.grpc.Attributes; +import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; import javax.annotation.concurrent.NotThreadSafe; @@ -30,9 +32,9 @@ import javax.annotation.concurrent.NotThreadSafe; @NotThreadSafe interface SubchannelPool { /** - * Pass essential utilities. + * Pass essential utilities and the balancer that's using this pool. */ - void init(Helper helper); + void init(Helper helper, LoadBalancer lb); /** * Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available. @@ -42,10 +44,16 @@ interface SubchannelPool { Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes); /** - * Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the - * pool. + * Gets notified about a state change of Subchannel that is possibly cached in this pool. Do + * nothing if this pool doesn't own this Subchannel. */ - void returnSubchannel(Subchannel subchannel); + void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo); + + /** + * Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the + * pool, and the caller should stop referencing to this Subchannel. + */ + void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState); /** * Shuts down all subchannels in the pool immediately. diff --git a/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java b/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java index e8bc2299ce..9b6a7c811d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java @@ -21,6 +21,7 @@ import static io.grpc.grpclb.CachedSubchannelPool.SHUTDOWN_TIMEOUT_MS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -28,12 +29,17 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.same; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Subchannel; +import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.grpclb.CachedSubchannelPool.ShutdownSubchannelTask; import io.grpc.internal.FakeClock; @@ -58,6 +64,10 @@ public class CachedSubchannelPoolTest { private static final Attributes.Key ATTR_KEY = Attributes.Key.create("test-attr"); private static final Attributes ATTRS1 = Attributes.newBuilder().set(ATTR_KEY, "1").build(); private static final Attributes ATTRS2 = Attributes.newBuilder().set(ATTR_KEY, "2").build(); + private static final ConnectivityStateInfo READY_STATE = + ConnectivityStateInfo.forNonError(ConnectivityState.READY); + private static final ConnectivityStateInfo TRANSIENT_FAILURE_STATE = + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("Simulated")); private static final FakeClock.TaskFilter SHUTDOWN_TASK_FILTER = new FakeClock.TaskFilter() { @Override @@ -69,6 +79,7 @@ public class CachedSubchannelPoolTest { }; private final Helper helper = mock(Helper.class); + private final LoadBalancer balancer = mock(LoadBalancer.class); private final FakeClock clock = new FakeClock(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -96,9 +107,17 @@ public class CachedSubchannelPoolTest { return subchannel; } }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + syncContext.throwIfNotInThisSynchronizationContext(); + return null; + } + }).when(balancer).handleSubchannelState( + any(Subchannel.class), any(ConnectivityStateInfo.class)); when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getScheduledExecutorService()).thenReturn(clock.getScheduledExecutorService()); - pool.init(helper); + pool.init(helper, balancer); } @After @@ -107,6 +126,9 @@ public class CachedSubchannelPoolTest { for (Subchannel subchannel : mockSubchannels) { verify(subchannel, atMost(1)).shutdown(); } + verify(balancer, atLeast(0)) + .handleSubchannelState(any(Subchannel.class), any(ConnectivityStateInfo.class)); + verifyNoMoreInteractions(balancer); } @Test @@ -120,13 +142,13 @@ public class CachedSubchannelPoolTest { assertThat(subchannel2).isNotSameAs(subchannel1); verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); - pool.returnSubchannel(subchannel1); + pool.returnSubchannel(subchannel1, READY_STATE); // subchannel1 is 1ms away from expiration. clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); verify(subchannel1, never()).shutdown(); - pool.returnSubchannel(subchannel2); + pool.returnSubchannel(subchannel2, READY_STATE); // subchannel1 expires. subchannel2 is (SHUTDOWN_TIMEOUT_MS - 1) away from expiration. clock.forwardTime(1, MILLISECONDS); @@ -150,7 +172,7 @@ public class CachedSubchannelPoolTest { assertThat(subchannel2).isNotSameAs(subchannel1); verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); - pool.returnSubchannel(subchannel1); + pool.returnSubchannel(subchannel1, READY_STATE); // subchannel1 is 1ms away from expiration. clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); @@ -159,7 +181,7 @@ public class CachedSubchannelPoolTest { Subchannel subchannel1a = pool.takeOrCreateSubchannel(EAG1, ATTRS1); assertThat(subchannel1a).isSameAs(subchannel1); - pool.returnSubchannel(subchannel2); + pool.returnSubchannel(subchannel2, READY_STATE); // subchannel2 expires SHUTDOWN_TIMEOUT_MS after being returned clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); @@ -173,7 +195,7 @@ public class CachedSubchannelPoolTest { verify(helper, times(2)).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); // subchannel1 expires SHUTDOWN_TIMEOUT_MS after being returned - pool.returnSubchannel(subchannel1a); + pool.returnSubchannel(subchannel1a, READY_STATE); clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); verify(subchannel1a, never()).shutdown(); clock.forwardTime(1, MILLISECONDS); @@ -182,6 +204,51 @@ public class CachedSubchannelPoolTest { assertThat(clock.numPendingTasks()).isEqualTo(0); } + @Test + public void updateStateWhileInPool() { + Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); + Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); + pool.returnSubchannel(subchannel1, READY_STATE); + pool.returnSubchannel(subchannel2, TRANSIENT_FAILURE_STATE); + + ConnectivityStateInfo anotherFailureState = + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("Another")); + + pool.handleSubchannelState(subchannel1, anotherFailureState); + + verify(balancer, never()) + .handleSubchannelState(any(Subchannel.class), any(ConnectivityStateInfo.class)); + + assertThat(pool.takeOrCreateSubchannel(EAG1, ATTRS1)).isSameAs(subchannel1); + verify(balancer).handleSubchannelState(same(subchannel1), same(anotherFailureState)); + verifyNoMoreInteractions(balancer); + + assertThat(pool.takeOrCreateSubchannel(EAG2, ATTRS2)).isSameAs(subchannel2); + verify(balancer).handleSubchannelState(same(subchannel2), same(TRANSIENT_FAILURE_STATE)); + verifyNoMoreInteractions(balancer); + } + + @Test + public void updateStateWhileInPool_notSameObject() { + Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); + pool.returnSubchannel(subchannel1, READY_STATE); + + Subchannel subchannel2 = helper.createSubchannel(EAG1, ATTRS1); + Subchannel subchannel3 = helper.createSubchannel(EAG2, ATTRS2); + + // subchannel2 is not in the pool, although with the same address + pool.handleSubchannelState(subchannel2, TRANSIENT_FAILURE_STATE); + + // subchannel3 is not in the pool. In fact its address is not in the pool + pool.handleSubchannelState(subchannel3, TRANSIENT_FAILURE_STATE); + + assertThat(pool.takeOrCreateSubchannel(EAG1, ATTRS1)).isSameAs(subchannel1); + + // subchannel1's state is unchanged + verify(balancer).handleSubchannelState(same(subchannel1), same(READY_STATE)); + verifyNoMoreInteractions(balancer); + } + @Test public void returnDuplicateAddressSubchannel() { Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); @@ -190,20 +257,20 @@ public class CachedSubchannelPoolTest { assertThat(subchannel1).isNotSameAs(subchannel2); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).isEmpty(); - pool.returnSubchannel(subchannel2); + pool.returnSubchannel(subchannel2, READY_STATE); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1); // If the subchannel being returned has an address that is the same as a subchannel in the pool, // the returned subchannel will be shut down. verify(subchannel1, never()).shutdown(); - pool.returnSubchannel(subchannel1); + pool.returnSubchannel(subchannel1, READY_STATE); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1); verify(subchannel1).shutdown(); - pool.returnSubchannel(subchannel3); + pool.returnSubchannel(subchannel3, READY_STATE); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2); // Returning the same subchannel twice has no effect. - pool.returnSubchannel(subchannel3); + pool.returnSubchannel(subchannel3, READY_STATE); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2); verify(subchannel2, never()).shutdown(); @@ -216,8 +283,8 @@ public class CachedSubchannelPoolTest { Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); Subchannel subchannel3 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); - pool.returnSubchannel(subchannel1); - pool.returnSubchannel(subchannel2); + pool.returnSubchannel(subchannel1, READY_STATE); + pool.returnSubchannel(subchannel2, READY_STATE); verify(subchannel1, never()).shutdown(); verify(subchannel2, never()).shutdown(); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index ee6e185003..8d2645436a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -291,7 +291,7 @@ public class GrpclbLoadBalancerTest { when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); balancer = new GrpclbLoadBalancer(helper, subchannelPool, fakeClock.getTimeProvider(), backoffPolicyProvider); - verify(subchannelPool).init(same(helper)); + verify(subchannelPool).init(same(helper), same(balancer)); } @After @@ -313,7 +313,7 @@ public class GrpclbLoadBalancerTest { } // GRPCLB manages subchannels only through subchannelPool for (Subchannel subchannel : pooledSubchannelTracker) { - verify(subchannelPool).returnSubchannel(same(subchannel)); + verify(subchannelPool).returnSubchannel(same(subchannel), any(ConnectivityStateInfo.class)); // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if // LoadBalancer has called it expectedly. verify(subchannel, never()).shutdown(); @@ -1040,8 +1040,9 @@ public class GrpclbLoadBalancerTest { inOrder.verifyNoMoreInteractions(); // As long as there is at least one READY subchannel, round robin will work. - Status error1 = Status.UNAVAILABLE.withDescription("error1"); - deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); + ConnectivityStateInfo errorState1 = + ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1")); + deliverSubchannelState(subchannel1, errorState1); inOrder.verifyNoMoreInteractions(); // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING @@ -1065,7 +1066,8 @@ public class GrpclbLoadBalancerTest { new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time new ServerEntry("token0006")); // drop - verify(subchannelPool, never()).returnSubchannel(same(subchannel1)); + verify(subchannelPool, never()) + .returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); assertThat(logs).containsExactly( @@ -1081,9 +1083,10 @@ public class GrpclbLoadBalancerTest { logs.clear(); // not in backends2, closed - verify(subchannelPool).returnSubchannel(same(subchannel1)); + verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1)); // backends2[2], will be kept - verify(subchannelPool, never()).returnSubchannel(same(subchannel2)); + verify(subchannelPool, never()) + .returnSubchannel(same(subchannel2), any(ConnectivityStateInfo.class)); inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), @@ -1091,6 +1094,13 @@ public class GrpclbLoadBalancerTest { inOrder.verify(subchannelPool).takeOrCreateSubchannel( eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), any(Attributes.class)); + + ConnectivityStateInfo errorOnCachedSubchannel1 = + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("You can get this error even if you are cached")); + deliverSubchannelState(subchannel1, errorOnCachedSubchannel1); + verify(subchannelPool).handleSubchannelState(same(subchannel1), same(errorOnCachedSubchannel1)); + assertEquals(1, mockSubchannels.size()); Subchannel subchannel3 = mockSubchannels.poll(); verify(subchannel3).requestConnection(); @@ -1107,11 +1117,15 @@ public class GrpclbLoadBalancerTest { new DropEntry(getLoadRecorder(), "token0006")).inOrder(); assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); - // State updates on obsolete subchannel1 will have no effect + // State updates on obsolete subchannel1 will only be passed to the pool deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState( subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); + inOrder.verify(subchannelPool) + .handleSubchannelState(same(subchannel1), eq(ConnectivityStateInfo.forNonError(READY))); + inOrder.verify(subchannelPool).handleSubchannelState( + same(subchannel1), eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE))); inOrder.verifyNoMoreInteractions(); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); @@ -1141,12 +1155,15 @@ public class GrpclbLoadBalancerTest { new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); - verify(subchannelPool, never()).returnSubchannel(same(subchannel3)); + verify(subchannelPool, never()) + .returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class)); // Update backends, with no entry lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); - verify(subchannelPool).returnSubchannel(same(subchannel2)); - verify(subchannelPool).returnSubchannel(same(subchannel3)); + verify(subchannelPool) + .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); + verify(subchannelPool) + .returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY))); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker10.dropList).isEmpty(); @@ -1790,7 +1807,8 @@ public class GrpclbLoadBalancerTest { // PICK_FIRST doesn't use subchannelPool verify(subchannelPool, never()) .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); - verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); + verify(subchannelPool, never()) + .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } @SuppressWarnings("unchecked") @@ -1869,7 +1887,8 @@ public class GrpclbLoadBalancerTest { // PICK_FIRST doesn't use subchannelPool verify(subchannelPool, never()) .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); - verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); + verify(subchannelPool, never()) + .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } @Test @@ -1914,7 +1933,8 @@ public class GrpclbLoadBalancerTest { assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); - verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); + verify(subchannelPool, never()) + .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); // Switch to PICK_FIRST lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; @@ -1925,8 +1945,10 @@ public class GrpclbLoadBalancerTest { // GrpclbState will be shutdown, and a new one will be created assertThat(oobChannel.isShutdown()).isTrue(); - verify(subchannelPool).returnSubchannel(same(subchannel1)); - verify(subchannelPool).returnSubchannel(same(subchannel2)); + verify(subchannelPool) + .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE))); + verify(subchannelPool) + .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE))); // A new LB stream is created assertEquals(1, fakeOobChannels.size());