grpclb: keep track of state updates for cached Subchannels. (#5441)

The problem: GrpclbState tracks Subchannels' states as a mutable
attribute in Subchannel.getAttributes(). However, GrpclbState only
update this attribute for the Subchannels its managing. For those
cached in SubchannelPool, their state attributes are stale. When they
are given back to GrpclbState, IDLE state is assumed.  As a result, if
a Subchannel is READY when it's reclaimed from the pool, it will not
be picked.

To fix that, this change expands SubchannelPool interface to handle
Subchannel state updates, which GrpclbState will call. SubchannelPool
saves the latest state and delivers it when it's returned to
GrpclbState by scheduling a call to handleSubchannelState() in the
SynchronizationContext, so that GrpclbState will take the latest state
as if it was just reported from the Channel.
This commit is contained in:
Kun Zhang 2019-03-07 13:14:21 -08:00 committed by GitHub
parent 6b0325c84f
commit 034675e555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 177 additions and 45 deletions

View File

@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
@ -37,31 +39,51 @@ final class CachedSubchannelPool implements SubchannelPool {
new HashMap<>(); new HashMap<>();
private Helper helper; private Helper helper;
private LoadBalancer lb;
@VisibleForTesting @VisibleForTesting
static final long SHUTDOWN_TIMEOUT_MS = 10000; static final long SHUTDOWN_TIMEOUT_MS = 10000;
@Override @Override
public void init(Helper helper) { public void init(Helper helper, LoadBalancer lb) {
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
this.lb = checkNotNull(lb, "lb");
} }
@Override @Override
public Subchannel takeOrCreateSubchannel( public Subchannel takeOrCreateSubchannel(
EquivalentAddressGroup eag, Attributes defaultAttributes) { EquivalentAddressGroup eag, Attributes defaultAttributes) {
CacheEntry entry = cache.remove(eag); final CacheEntry entry = cache.remove(eag);
Subchannel subchannel; final Subchannel subchannel;
if (entry == null) { if (entry == null) {
subchannel = helper.createSubchannel(eag, defaultAttributes); subchannel = helper.createSubchannel(eag, defaultAttributes);
} else { } else {
subchannel = entry.subchannel; subchannel = entry.subchannel;
entry.shutdownTimer.cancel(); entry.shutdownTimer.cancel();
// Make the balancer up-to-date with the latest state in case it has changed while it's
// in the cache.
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
lb.handleSubchannelState(subchannel, entry.state);
}
});
} }
return subchannel; return subchannel;
} }
@Override @Override
public void returnSubchannel(Subchannel subchannel) { public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo) {
CacheEntry cached = cache.get(subchannel.getAddresses());
if (cached == null || cached.subchannel != subchannel) {
// Given subchannel is not cached. Not our responsibility.
return;
}
cached.state = newStateInfo;
}
@Override
public void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState) {
CacheEntry prev = cache.get(subchannel.getAddresses()); CacheEntry prev = cache.get(subchannel.getAddresses());
if (prev != null) { if (prev != null) {
// Returning the same Subchannel twice has no effect. // Returning the same Subchannel twice has no effect.
@ -77,7 +99,7 @@ final class CachedSubchannelPool implements SubchannelPool {
helper.getSynchronizationContext().schedule( helper.getSynchronizationContext().schedule(
shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS, shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService()); helper.getScheduledExecutorService());
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer); CacheEntry entry = new CacheEntry(subchannel, shutdownTimer, lastKnownState);
cache.put(subchannel.getAddresses(), entry); cache.put(subchannel.getAddresses(), entry);
} }
@ -110,10 +132,12 @@ final class CachedSubchannelPool implements SubchannelPool {
private static class CacheEntry { private static class CacheEntry {
final Subchannel subchannel; final Subchannel subchannel;
final ScheduledHandle shutdownTimer; final ScheduledHandle shutdownTimer;
ConnectivityStateInfo state;
CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer) { CacheEntry(Subchannel subchannel, ScheduledHandle shutdownTimer, ConnectivityStateInfo state) {
this.subchannel = checkNotNull(subchannel, "subchannel"); this.subchannel = checkNotNull(subchannel, "subchannel");
this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer"); this.shutdownTimer = checkNotNull(shutdownTimer, "shutdownTimer");
this.state = checkNotNull(state, "state");
} }
} }
} }

View File

@ -71,7 +71,7 @@ class GrpclbLoadBalancer extends LoadBalancer {
this.time = checkNotNull(time, "time provider"); this.time = checkNotNull(time, "time provider");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
this.subchannelPool.init(helper); this.subchannelPool.init(helper, this);
recreateStates(); recreateStates();
checkNotNull(grpclbState, "grpclbState"); checkNotNull(grpclbState, "grpclbState");
} }

View File

@ -159,7 +159,8 @@ final class GrpclbState {
this.mode = checkNotNull(mode, "mode"); this.mode = checkNotNull(mode, "mode");
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool"); this.subchannelPool =
mode == Mode.ROUND_ROBIN ? checkNotNull(subchannelPool, "subchannelPool") : null;
this.time = checkNotNull(time, "time provider"); this.time = checkNotNull(time, "time provider");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
@ -168,7 +169,13 @@ final class GrpclbState {
} }
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) { void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (newState.getState() == SHUTDOWN || !subchannels.values().contains(subchannel)) { if (newState.getState() == SHUTDOWN) {
return;
}
if (!subchannels.values().contains(subchannel)) {
if (subchannelPool != null ) {
subchannelPool.handleSubchannelState(subchannel, newState);
}
return; return;
} }
if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) { if (mode == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
@ -311,8 +318,9 @@ final class GrpclbState {
// We close the subchannels through subchannelPool instead of helper just for convenience of // We close the subchannels through subchannelPool instead of helper just for convenience of
// testing. // testing.
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
subchannelPool.returnSubchannel(subchannel); returnSubchannelToPool(subchannel);
} }
subchannelPool.clear();
break; break;
case PICK_FIRST: case PICK_FIRST:
checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels);
@ -322,7 +330,6 @@ final class GrpclbState {
throw new AssertionError("Missing case for " + mode); throw new AssertionError("Missing case for " + mode);
} }
subchannels = Collections.emptyMap(); subchannels = Collections.emptyMap();
subchannelPool.clear();
cancelFallbackTimer(); cancelFallbackTimer();
cancelLbRpcRetryTimer(); cancelLbRpcRetryTimer();
} }
@ -335,6 +342,10 @@ final class GrpclbState {
} }
} }
private void returnSubchannelToPool(Subchannel subchannel) {
subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
}
@VisibleForTesting @VisibleForTesting
@Nullable @Nullable
GrpclbClientLoadRecorder getLoadRecorder() { GrpclbClientLoadRecorder getLoadRecorder() {
@ -383,7 +394,7 @@ final class GrpclbState {
for (Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) { for (Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
List<EquivalentAddressGroup> eagList = entry.getKey(); List<EquivalentAddressGroup> eagList = entry.getKey();
if (!newSubchannelMap.containsKey(eagList)) { if (!newSubchannelMap.containsKey(eagList)) {
subchannelPool.returnSubchannel(entry.getValue()); returnSubchannelToPool(entry.getValue());
} }
} }
subchannels = Collections.unmodifiableMap(newSubchannelMap); subchannels = Collections.unmodifiableMap(newSubchannelMap);

View File

@ -17,7 +17,9 @@
package io.grpc.grpclb; package io.grpc.grpclb;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
@ -30,9 +32,9 @@ import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe @NotThreadSafe
interface SubchannelPool { interface SubchannelPool {
/** /**
* Pass essential utilities. * Pass essential utilities and the balancer that's using this pool.
*/ */
void init(Helper helper); void init(Helper helper, LoadBalancer lb);
/** /**
* Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available. * Takes a {@link Subchannel} from the pool for the given {@code eag} if there is one available.
@ -42,10 +44,16 @@ interface SubchannelPool {
Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes); Subchannel takeOrCreateSubchannel(EquivalentAddressGroup eag, Attributes defaultAttributes);
/** /**
* Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the * Gets notified about a state change of Subchannel that is possibly cached in this pool. Do
* pool. * nothing if this pool doesn't own this Subchannel.
*/ */
void returnSubchannel(Subchannel subchannel); void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newStateInfo);
/**
* Puts a {@link Subchannel} back to the pool. From this point the Subchannel is owned by the
* pool, and the caller should stop referencing to this Subchannel.
*/
void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState);
/** /**
* Shuts down all subchannels in the pool immediately. * Shuts down all subchannels in the pool immediately.

View File

@ -21,6 +21,7 @@ import static io.grpc.grpclb.CachedSubchannelPool.SHUTDOWN_TIMEOUT_MS;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -28,12 +29,17 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same; import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.grpclb.CachedSubchannelPool.ShutdownSubchannelTask; import io.grpc.grpclb.CachedSubchannelPool.ShutdownSubchannelTask;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
@ -58,6 +64,10 @@ public class CachedSubchannelPoolTest {
private static final Attributes.Key<String> ATTR_KEY = Attributes.Key.create("test-attr"); private static final Attributes.Key<String> ATTR_KEY = Attributes.Key.create("test-attr");
private static final Attributes ATTRS1 = Attributes.newBuilder().set(ATTR_KEY, "1").build(); private static final Attributes ATTRS1 = Attributes.newBuilder().set(ATTR_KEY, "1").build();
private static final Attributes ATTRS2 = Attributes.newBuilder().set(ATTR_KEY, "2").build(); private static final Attributes ATTRS2 = Attributes.newBuilder().set(ATTR_KEY, "2").build();
private static final ConnectivityStateInfo READY_STATE =
ConnectivityStateInfo.forNonError(ConnectivityState.READY);
private static final ConnectivityStateInfo TRANSIENT_FAILURE_STATE =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("Simulated"));
private static final FakeClock.TaskFilter SHUTDOWN_TASK_FILTER = private static final FakeClock.TaskFilter SHUTDOWN_TASK_FILTER =
new FakeClock.TaskFilter() { new FakeClock.TaskFilter() {
@Override @Override
@ -69,6 +79,7 @@ public class CachedSubchannelPoolTest {
}; };
private final Helper helper = mock(Helper.class); private final Helper helper = mock(Helper.class);
private final LoadBalancer balancer = mock(LoadBalancer.class);
private final FakeClock clock = new FakeClock(); private final FakeClock clock = new FakeClock();
private final SynchronizationContext syncContext = new SynchronizationContext( private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() { new Thread.UncaughtExceptionHandler() {
@ -96,9 +107,17 @@ public class CachedSubchannelPoolTest {
return subchannel; return subchannel;
} }
}).when(helper).createSubchannel(any(List.class), any(Attributes.class)); }).when(helper).createSubchannel(any(List.class), any(Attributes.class));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
syncContext.throwIfNotInThisSynchronizationContext();
return null;
}
}).when(balancer).handleSubchannelState(
any(Subchannel.class), any(ConnectivityStateInfo.class));
when(helper.getSynchronizationContext()).thenReturn(syncContext); when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(clock.getScheduledExecutorService()); when(helper.getScheduledExecutorService()).thenReturn(clock.getScheduledExecutorService());
pool.init(helper); pool.init(helper, balancer);
} }
@After @After
@ -107,6 +126,9 @@ public class CachedSubchannelPoolTest {
for (Subchannel subchannel : mockSubchannels) { for (Subchannel subchannel : mockSubchannels) {
verify(subchannel, atMost(1)).shutdown(); verify(subchannel, atMost(1)).shutdown();
} }
verify(balancer, atLeast(0))
.handleSubchannelState(any(Subchannel.class), any(ConnectivityStateInfo.class));
verifyNoMoreInteractions(balancer);
} }
@Test @Test
@ -120,13 +142,13 @@ public class CachedSubchannelPoolTest {
assertThat(subchannel2).isNotSameAs(subchannel1); assertThat(subchannel2).isNotSameAs(subchannel1);
verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
pool.returnSubchannel(subchannel1); pool.returnSubchannel(subchannel1, READY_STATE);
// subchannel1 is 1ms away from expiration. // subchannel1 is 1ms away from expiration.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel1, never()).shutdown(); verify(subchannel1, never()).shutdown();
pool.returnSubchannel(subchannel2); pool.returnSubchannel(subchannel2, READY_STATE);
// subchannel1 expires. subchannel2 is (SHUTDOWN_TIMEOUT_MS - 1) away from expiration. // subchannel1 expires. subchannel2 is (SHUTDOWN_TIMEOUT_MS - 1) away from expiration.
clock.forwardTime(1, MILLISECONDS); clock.forwardTime(1, MILLISECONDS);
@ -150,7 +172,7 @@ public class CachedSubchannelPoolTest {
assertThat(subchannel2).isNotSameAs(subchannel1); assertThat(subchannel2).isNotSameAs(subchannel1);
verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
pool.returnSubchannel(subchannel1); pool.returnSubchannel(subchannel1, READY_STATE);
// subchannel1 is 1ms away from expiration. // subchannel1 is 1ms away from expiration.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
@ -159,7 +181,7 @@ public class CachedSubchannelPoolTest {
Subchannel subchannel1a = pool.takeOrCreateSubchannel(EAG1, ATTRS1); Subchannel subchannel1a = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
assertThat(subchannel1a).isSameAs(subchannel1); assertThat(subchannel1a).isSameAs(subchannel1);
pool.returnSubchannel(subchannel2); pool.returnSubchannel(subchannel2, READY_STATE);
// subchannel2 expires SHUTDOWN_TIMEOUT_MS after being returned // subchannel2 expires SHUTDOWN_TIMEOUT_MS after being returned
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
@ -173,7 +195,7 @@ public class CachedSubchannelPoolTest {
verify(helper, times(2)).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); verify(helper, times(2)).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
// subchannel1 expires SHUTDOWN_TIMEOUT_MS after being returned // subchannel1 expires SHUTDOWN_TIMEOUT_MS after being returned
pool.returnSubchannel(subchannel1a); pool.returnSubchannel(subchannel1a, READY_STATE);
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS); clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel1a, never()).shutdown(); verify(subchannel1a, never()).shutdown();
clock.forwardTime(1, MILLISECONDS); clock.forwardTime(1, MILLISECONDS);
@ -182,6 +204,51 @@ public class CachedSubchannelPoolTest {
assertThat(clock.numPendingTasks()).isEqualTo(0); assertThat(clock.numPendingTasks()).isEqualTo(0);
} }
@Test
public void updateStateWhileInPool() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
pool.returnSubchannel(subchannel1, READY_STATE);
pool.returnSubchannel(subchannel2, TRANSIENT_FAILURE_STATE);
ConnectivityStateInfo anotherFailureState =
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("Another"));
pool.handleSubchannelState(subchannel1, anotherFailureState);
verify(balancer, never())
.handleSubchannelState(any(Subchannel.class), any(ConnectivityStateInfo.class));
assertThat(pool.takeOrCreateSubchannel(EAG1, ATTRS1)).isSameAs(subchannel1);
verify(balancer).handleSubchannelState(same(subchannel1), same(anotherFailureState));
verifyNoMoreInteractions(balancer);
assertThat(pool.takeOrCreateSubchannel(EAG2, ATTRS2)).isSameAs(subchannel2);
verify(balancer).handleSubchannelState(same(subchannel2), same(TRANSIENT_FAILURE_STATE));
verifyNoMoreInteractions(balancer);
}
@Test
public void updateStateWhileInPool_notSameObject() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
pool.returnSubchannel(subchannel1, READY_STATE);
Subchannel subchannel2 = helper.createSubchannel(EAG1, ATTRS1);
Subchannel subchannel3 = helper.createSubchannel(EAG2, ATTRS2);
// subchannel2 is not in the pool, although with the same address
pool.handleSubchannelState(subchannel2, TRANSIENT_FAILURE_STATE);
// subchannel3 is not in the pool. In fact its address is not in the pool
pool.handleSubchannelState(subchannel3, TRANSIENT_FAILURE_STATE);
assertThat(pool.takeOrCreateSubchannel(EAG1, ATTRS1)).isSameAs(subchannel1);
// subchannel1's state is unchanged
verify(balancer).handleSubchannelState(same(subchannel1), same(READY_STATE));
verifyNoMoreInteractions(balancer);
}
@Test @Test
public void returnDuplicateAddressSubchannel() { public void returnDuplicateAddressSubchannel() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
@ -190,20 +257,20 @@ public class CachedSubchannelPoolTest {
assertThat(subchannel1).isNotSameAs(subchannel2); assertThat(subchannel1).isNotSameAs(subchannel2);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).isEmpty(); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).isEmpty();
pool.returnSubchannel(subchannel2); pool.returnSubchannel(subchannel2, READY_STATE);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1);
// If the subchannel being returned has an address that is the same as a subchannel in the pool, // If the subchannel being returned has an address that is the same as a subchannel in the pool,
// the returned subchannel will be shut down. // the returned subchannel will be shut down.
verify(subchannel1, never()).shutdown(); verify(subchannel1, never()).shutdown();
pool.returnSubchannel(subchannel1); pool.returnSubchannel(subchannel1, READY_STATE);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1);
verify(subchannel1).shutdown(); verify(subchannel1).shutdown();
pool.returnSubchannel(subchannel3); pool.returnSubchannel(subchannel3, READY_STATE);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2);
// Returning the same subchannel twice has no effect. // Returning the same subchannel twice has no effect.
pool.returnSubchannel(subchannel3); pool.returnSubchannel(subchannel3, READY_STATE);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2); assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2);
verify(subchannel2, never()).shutdown(); verify(subchannel2, never()).shutdown();
@ -216,8 +283,8 @@ public class CachedSubchannelPoolTest {
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
Subchannel subchannel3 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); Subchannel subchannel3 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
pool.returnSubchannel(subchannel1); pool.returnSubchannel(subchannel1, READY_STATE);
pool.returnSubchannel(subchannel2); pool.returnSubchannel(subchannel2, READY_STATE);
verify(subchannel1, never()).shutdown(); verify(subchannel1, never()).shutdown();
verify(subchannel2, never()).shutdown(); verify(subchannel2, never()).shutdown();

View File

@ -291,7 +291,7 @@ public class GrpclbLoadBalancerTest {
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
balancer = new GrpclbLoadBalancer(helper, subchannelPool, fakeClock.getTimeProvider(), balancer = new GrpclbLoadBalancer(helper, subchannelPool, fakeClock.getTimeProvider(),
backoffPolicyProvider); backoffPolicyProvider);
verify(subchannelPool).init(same(helper)); verify(subchannelPool).init(same(helper), same(balancer));
} }
@After @After
@ -313,7 +313,7 @@ public class GrpclbLoadBalancerTest {
} }
// GRPCLB manages subchannels only through subchannelPool // GRPCLB manages subchannels only through subchannelPool
for (Subchannel subchannel : pooledSubchannelTracker) { for (Subchannel subchannel : pooledSubchannelTracker) {
verify(subchannelPool).returnSubchannel(same(subchannel)); verify(subchannelPool).returnSubchannel(same(subchannel), any(ConnectivityStateInfo.class));
// Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if // Our mock subchannelPool never calls Subchannel.shutdown(), thus we can tell if
// LoadBalancer has called it expectedly. // LoadBalancer has called it expectedly.
verify(subchannel, never()).shutdown(); verify(subchannel, never()).shutdown();
@ -1040,8 +1040,9 @@ public class GrpclbLoadBalancerTest {
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// As long as there is at least one READY subchannel, round robin will work. // As long as there is at least one READY subchannel, round robin will work.
Status error1 = Status.UNAVAILABLE.withDescription("error1"); ConnectivityStateInfo errorState1 =
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1)); ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE.withDescription("error1"));
deliverSubchannelState(subchannel1, errorState1);
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING // If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
@ -1065,7 +1066,8 @@ public class GrpclbLoadBalancerTest {
new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed new ServerEntry("127.0.0.1", 2010, "token0004"), // Existing address with token changed
new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time new ServerEntry("127.0.0.1", 2030, "token0005"), // New address appearing second time
new ServerEntry("token0006")); // drop new ServerEntry("token0006")); // drop
verify(subchannelPool, never()).returnSubchannel(same(subchannel1)); verify(subchannelPool, never())
.returnSubchannel(same(subchannel1), any(ConnectivityStateInfo.class));
lbResponseObserver.onNext(buildLbResponse(backends2)); lbResponseObserver.onNext(buildLbResponse(backends2));
assertThat(logs).containsExactly( assertThat(logs).containsExactly(
@ -1081,9 +1083,10 @@ public class GrpclbLoadBalancerTest {
logs.clear(); logs.clear();
// not in backends2, closed // not in backends2, closed
verify(subchannelPool).returnSubchannel(same(subchannel1)); verify(subchannelPool).returnSubchannel(same(subchannel1), same(errorState1));
// backends2[2], will be kept // backends2[2], will be kept
verify(subchannelPool, never()).returnSubchannel(same(subchannel2)); verify(subchannelPool, never())
.returnSubchannel(same(subchannel2), any(ConnectivityStateInfo.class));
inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel( inOrder.verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)), eq(new EquivalentAddressGroup(backends2.get(2).addr, LB_BACKEND_ATTRS)),
@ -1091,6 +1094,13 @@ public class GrpclbLoadBalancerTest {
inOrder.verify(subchannelPool).takeOrCreateSubchannel( inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)), eq(new EquivalentAddressGroup(backends2.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class)); any(Attributes.class));
ConnectivityStateInfo errorOnCachedSubchannel1 =
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("You can get this error even if you are cached"));
deliverSubchannelState(subchannel1, errorOnCachedSubchannel1);
verify(subchannelPool).handleSubchannelState(same(subchannel1), same(errorOnCachedSubchannel1));
assertEquals(1, mockSubchannels.size()); assertEquals(1, mockSubchannels.size());
Subchannel subchannel3 = mockSubchannels.poll(); Subchannel subchannel3 = mockSubchannels.poll();
verify(subchannel3).requestConnection(); verify(subchannel3).requestConnection();
@ -1107,11 +1117,15 @@ public class GrpclbLoadBalancerTest {
new DropEntry(getLoadRecorder(), "token0006")).inOrder(); new DropEntry(getLoadRecorder(), "token0006")).inOrder();
assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY); assertThat(picker7.pickList).containsExactly(BUFFER_ENTRY);
// State updates on obsolete subchannel1 will have no effect // State updates on obsolete subchannel1 will only be passed to the pool
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState( deliverSubchannelState(
subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); subchannel1, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN)); deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(SHUTDOWN));
inOrder.verify(subchannelPool)
.handleSubchannelState(same(subchannel1), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(subchannelPool).handleSubchannelState(
same(subchannel1), eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
@ -1141,12 +1155,15 @@ public class GrpclbLoadBalancerTest {
new BackendEntry(subchannel3, getLoadRecorder(), "token0003"), new BackendEntry(subchannel3, getLoadRecorder(), "token0003"),
new BackendEntry(subchannel2, getLoadRecorder(), "token0004"), new BackendEntry(subchannel2, getLoadRecorder(), "token0004"),
new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder(); new BackendEntry(subchannel3, getLoadRecorder(), "token0005")).inOrder();
verify(subchannelPool, never()).returnSubchannel(same(subchannel3)); verify(subchannelPool, never())
.returnSubchannel(same(subchannel3), any(ConnectivityStateInfo.class));
// Update backends, with no entry // Update backends, with no entry
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList())); lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
verify(subchannelPool).returnSubchannel(same(subchannel2)); verify(subchannelPool)
verify(subchannelPool).returnSubchannel(same(subchannel3)); .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
verify(subchannelPool)
.returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker10.dropList).isEmpty(); assertThat(picker10.dropList).isEmpty();
@ -1790,7 +1807,8 @@ public class GrpclbLoadBalancerTest {
// PICK_FIRST doesn't use subchannelPool // PICK_FIRST doesn't use subchannelPool
verify(subchannelPool, never()) verify(subchannelPool, never())
.takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -1869,7 +1887,8 @@ public class GrpclbLoadBalancerTest {
// PICK_FIRST doesn't use subchannelPool // PICK_FIRST doesn't use subchannelPool
verify(subchannelPool, never()) verify(subchannelPool, never())
.takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); .takeOrCreateSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
} }
@Test @Test
@ -1914,7 +1933,8 @@ public class GrpclbLoadBalancerTest {
assertEquals(2, mockSubchannels.size()); assertEquals(2, mockSubchannels.size());
Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel1 = mockSubchannels.poll();
Subchannel subchannel2 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll();
verify(subchannelPool, never()).returnSubchannel(any(Subchannel.class)); verify(subchannelPool, never())
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
// Switch to PICK_FIRST // Switch to PICK_FIRST
lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}"; lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}";
@ -1925,8 +1945,10 @@ public class GrpclbLoadBalancerTest {
// GrpclbState will be shutdown, and a new one will be created // GrpclbState will be shutdown, and a new one will be created
assertThat(oobChannel.isShutdown()).isTrue(); assertThat(oobChannel.isShutdown()).isTrue();
verify(subchannelPool).returnSubchannel(same(subchannel1)); verify(subchannelPool)
verify(subchannelPool).returnSubchannel(same(subchannel2)); .returnSubchannel(same(subchannel1), eq(ConnectivityStateInfo.forNonError(IDLE)));
verify(subchannelPool)
.returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(IDLE)));
// A new LB stream is created // A new LB stream is created
assertEquals(1, fakeOobChannels.size()); assertEquals(1, fakeOobChannels.size());