core: remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer

* Remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer

- Ensure active subchannel list and round-robin index is only
regenerated/refreshed when it changes
- Make it so that Subchannels exist in subchannels map iff their state
!= SHUTDOWN
- Add EmptyPicker class since logic for this case is disjoint from the
non-empty case

* remove explicit initialization of boolean ready field

per @carl-mastrangelo's review comment

* minor restructuring to make logic clearer; more explanatory comments

* move some checks inside updateBalancingState method for clarity

* store current state and picker in RRLB, only update when new one is diff

* some more simplification/refactoring; improve test coverage

- remove now redundant check in handleSubchannelState

- collapse getAggregatedState() and getAggregatedError() into
handleBalancingState()

- have both pickers extend new RoundRobinPicker, move
areEquivalentPickers() logic into RoundRobinPicker.isEquivalentTo()

- extend unit tests to cover some additional cases

* Address latest review comments from @zhangkun83

- Use explicit check for non-empty list instead of assert
- Change EmptyPicker.status to be non-nullable
- Further test coverage improvement including explicit picker comparison
tests

* use EMPTY_OK instead of Status.OK for initial empty picker
This commit is contained in:
Nick Hill 2018-09-12 23:06:46 +01:00 committed by Carl Mastrangelo
parent 7270938ebf
commit ed709ff9ff
2 changed files with 233 additions and 133 deletions

View File

@ -24,6 +24,9 @@ import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
@ -42,12 +45,10 @@ import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@ -113,6 +114,9 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
new HashMap<EquivalentAddressGroup, Subchannel>();
private final Random random;
private ConnectivityState currentState;
private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK);
@Nullable
private StickinessState stickinessState;
@ -176,50 +180,92 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
// Shutdown subchannels for removed addresses.
for (EquivalentAddressGroup addressGroup : removedAddrs) {
Subchannel subchannel = subchannels.remove(addressGroup);
subchannel.shutdown();
shutdownSubchannel(subchannel);
}
updateBalancingState(getAggregatedState(), getAggregatedError());
updateBalancingState();
}
@Override
public void handleNameResolutionError(Status error) {
updateBalancingState(TRANSIENT_FAILURE, error);
// ready pickers aren't affected by status changes
updateBalancingState(TRANSIENT_FAILURE,
currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error));
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (subchannels.get(subchannel.getAddresses()) != subchannel) {
return;
}
if (stateInfo.getState() == SHUTDOWN && stickinessState != null) {
stickinessState.remove(subchannel);
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection();
}
getSubchannelStateInfoRef(subchannel).value = stateInfo;
updateBalancingState(getAggregatedState(), getAggregatedError());
updateBalancingState();
}
private void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value =
ConnectivityStateInfo.forNonError(SHUTDOWN);
if (stickinessState != null) {
stickinessState.remove(subchannel);
}
}
@Override
public void shutdown() {
for (Subchannel subchannel : getSubchannels()) {
subchannel.shutdown();
shutdownSubchannel(subchannel);
}
}
private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");
/**
* Updates picker with the list of active subchannels (state == READY).
*/
private void updateBalancingState(ConnectivityState state, Status error) {
@SuppressWarnings("ReferenceEquality")
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels());
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = activeList.isEmpty() ? 0 : random.nextInt(activeList.size());
helper.updateBalancingState(
state,
new Picker(activeList, error, startIndex, stickinessState));
if (activeList.isEmpty()) {
// No READY subchannels, determine aggregate state and error status
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
// This subchannel IDLE is not because of channel IDLE_TIMEOUT,
// in which case LB is already shutdown.
// RRLB will request connection immediately on subchannel IDLE.
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE,
// If all subchannels are TRANSIENT_FAILURE, return the Status associated with
// an arbitrary subchannel, otherwise return OK.
new EmptyPicker(aggStatus));
} else {
// initialize the Picker to a random start index to ensure that a high frequency of Picker
// churn does not skew subchannel selection.
int startIndex = random.nextInt(activeList.size());
updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState));
}
}
private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) {
if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
helper.updateBalancingState(state, picker);
currentState = state;
currentPicker = picker;
}
}
/**
@ -229,7 +275,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
Collection<Subchannel> subchannels) {
List<Subchannel> readySubchannels = new ArrayList<Subchannel>(subchannels.size());
for (Subchannel subchannel : subchannels) {
if (getSubchannelStateInfoRef(subchannel).value.getState() == READY) {
if (isReady(subchannel)) {
readySubchannels.add(subchannel);
}
}
@ -248,43 +294,6 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
return addrs;
}
/**
* If all subchannels are TRANSIENT_FAILURE, return the Status associated with an arbitrary
* subchannel otherwise, return null.
*/
@Nullable
private Status getAggregatedError() {
Status status = null;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
if (stateInfo.getState() != TRANSIENT_FAILURE) {
return null;
}
status = stateInfo.getStatus();
}
return status;
}
private ConnectivityState getAggregatedState() {
Set<ConnectivityState> states = EnumSet.noneOf(ConnectivityState.class);
for (Subchannel subchannel : getSubchannels()) {
states.add(getSubchannelStateInfoRef(subchannel).value.getState());
}
if (states.contains(READY)) {
return READY;
}
if (states.contains(CONNECTING)) {
return CONNECTING;
}
if (states.contains(IDLE)) {
// This subchannel IDLE is not because of channel IDLE_TIMEOUT, in which case LB is already
// shutdown.
// RRLB will request connection immediately on subchannel IDLE.
return CONNECTING;
}
return TRANSIENT_FAILURE;
}
@VisibleForTesting
Collection<Subchannel> getSubchannels() {
return subchannels.values();
@ -294,6 +303,11 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
Subchannel subchannel) {
return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO");
}
// package-private to avoid synthetic access
static boolean isReady(Subchannel subchannel) {
return getSubchannelStateInfoRef(subchannel).value.getState() == READY;
}
private static <T> Set<T> setsDifference(Set<T> a, Set<T> b) {
Set<T> aCopy = new HashSet<T>(a);
@ -312,7 +326,8 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
* Holds stickiness related states: The stickiness key, a registry mapping stickiness values to
* the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref.
*/
private static final class StickinessState {
@VisibleForTesting
static final class StickinessState {
static final int MAX_ENTRIES = 1000;
final Key<String> key;
@ -332,7 +347,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
*/
@Nonnull
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel, List<Subchannel> rrList) {
String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
@ -344,7 +359,7 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && rrList.contains(existingSubchannel)) {
if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
@ -384,59 +399,49 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
}
}
}
// Only subclasses are ReadyPicker or EmptyPicker
private abstract static class RoundRobinPicker extends SubchannelPicker {
abstract boolean isEquivalentTo(RoundRobinPicker picker);
}
@VisibleForTesting
static final class Picker extends SubchannelPicker {
private static final AtomicIntegerFieldUpdater<Picker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index");
static final class ReadyPicker extends RoundRobinPicker {
private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");
@Nullable
private final Status status;
private final List<Subchannel> list;
private final List<Subchannel> list; // non-empty
@Nullable
private final RoundRobinLoadBalancer.StickinessState stickinessState;
@SuppressWarnings("unused")
private volatile int index;
Picker(
List<Subchannel> list, @Nullable Status status, int startIndex,
ReadyPicker(List<Subchannel> list, int startIndex,
@Nullable RoundRobinLoadBalancer.StickinessState stickinessState) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.status = status;
this.stickinessState = stickinessState;
this.index = startIndex - 1;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
if (list.size() > 0) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !list.contains(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list);
}
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
if (status != null) {
return PickResult.withError(status);
}
return PickResult.withNoResult();
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
private Subchannel nextSubchannel() {
int size = list.size();
if (size == 0) {
throw new NoSuchElementException();
}
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
@ -451,9 +456,37 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
return list;
}
@VisibleForTesting
Status getStatus() {
return status;
@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
}
ReadyPicker other = (ReadyPicker) picker;
// the lists cannot contain duplicate subchannels
return other == this || (stickinessState == other.stickinessState
&& list.size() == other.list.size()
&& new HashSet<Subchannel>(list).containsAll(other.list));
}
}
@VisibleForTesting
static final class EmptyPicker extends RoundRobinPicker {
private final Status status;
EmptyPicker(@Nonnull Status status) {
this.status = Preconditions.checkNotNull(status, "status");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status);
}
@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status)
|| (status.isOk() && ((EmptyPicker) picker).status.isOk()));
}
}
}

View File

@ -20,13 +20,16 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.STATE_INFO;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
@ -49,16 +52,22 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.Status;
import io.grpc.internal.GrpcAttributes;
import io.grpc.util.RoundRobinLoadBalancerFactory.Picker;
import io.grpc.util.RoundRobinLoadBalancerFactory.EmptyPicker;
import io.grpc.util.RoundRobinLoadBalancerFactory.ReadyPicker;
import io.grpc.util.RoundRobinLoadBalancerFactory.Ref;
import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer;
import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.StickinessState;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -87,7 +96,7 @@ public class RoundRobinLoadBalancerTest {
private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build();
@Captor
private ArgumentCaptor<Picker> pickerCaptor;
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
@Captor
private ArgumentCaptor<ConnectivityState> stateCaptor;
@Captor
@ -151,7 +160,7 @@ public class RoundRobinLoadBalancerTest {
assertEquals(CONNECTING, stateCaptor.getAllValues().get(0));
assertEquals(READY, stateCaptor.getAllValues().get(1));
assertThat(pickerCaptor.getValue().getList()).containsExactly(readySubchannel);
assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel);
verifyNoMoreInteractions(mockHelper);
}
@ -195,9 +204,8 @@ public class RoundRobinLoadBalancerTest {
InOrder inOrder = inOrder(mockHelper);
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
assertNull(picker.getStatus());
assertThat(picker.getList()).containsExactly(removedSubchannel, oldSubchannel);
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(getList(picker)).containsExactly(removedSubchannel, oldSubchannel);
verify(removedSubchannel, times(1)).requestConnection();
verify(oldSubchannel, times(1)).requestConnection();
@ -218,6 +226,9 @@ public class RoundRobinLoadBalancerTest {
verify(newSubchannel, times(1)).requestConnection();
verify(removedSubchannel, times(1)).shutdown();
loadBalancer.handleSubchannelState(removedSubchannel,
ConnectivityStateInfo.forNonError(SHUTDOWN));
assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel,
newSubchannel);
@ -227,8 +238,14 @@ public class RoundRobinLoadBalancerTest {
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = pickerCaptor.getValue();
assertNull(picker.getStatus());
assertThat(picker.getList()).containsExactly(oldSubchannel, newSubchannel);
assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel);
// test going from non-empty to empty
loadBalancer.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(),
affinity);
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper);
}
@ -241,13 +258,13 @@ public class RoundRobinLoadBalancerTest {
Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(Picker.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
assertThat(subchannelStateInfo.value).isEqualTo(ConnectivityStateInfo.forNonError(IDLE));
loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().getStatus());
assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class);
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forNonError(READY));
@ -257,12 +274,10 @@ public class RoundRobinLoadBalancerTest {
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().getStatus());
assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class);
loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertNull(pickerCaptor.getValue().getStatus());
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forNonError(IDLE));
@ -282,9 +297,9 @@ public class RoundRobinLoadBalancerTest {
Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
Picker picker = new Picker(Collections.unmodifiableList(Lists.newArrayList(
subchannel, subchannel1, subchannel2)), null /* status */, 0 /* startIndex */,
null /* stickinessState */);
ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList(
Lists.<Subchannel>newArrayList(subchannel, subchannel1, subchannel2)),
0 /* startIndex */, null /* stickinessState */);
assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2);
@ -296,8 +311,7 @@ public class RoundRobinLoadBalancerTest {
@Test
public void pickerEmptyList() throws Exception {
Picker picker =
new Picker(Lists.<Subchannel>newArrayList(), Status.UNKNOWN, 0, null /* stickinessState */);
SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN);
assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(Status.UNKNOWN,
@ -363,23 +377,23 @@ public class RoundRobinLoadBalancerTest {
verify(mockHelper, times(6))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
Iterator<Picker> pickers = pickerCaptor.getAllValues().iterator();
Iterator<SubchannelPicker> pickers = pickerCaptor.getAllValues().iterator();
// The picker is incrementally updated as subchannels become READY
assertEquals(CONNECTING, stateIterator.next());
assertThat(pickers.next().getList()).isEmpty();
assertThat(pickers.next()).isInstanceOf(EmptyPicker.class);
assertEquals(READY, stateIterator.next());
assertThat(pickers.next().getList()).containsExactly(sc1);
assertThat(getList(pickers.next())).containsExactly(sc1);
assertEquals(READY, stateIterator.next());
assertThat(pickers.next().getList()).containsExactly(sc1, sc2);
assertThat(getList(pickers.next())).containsExactly(sc1, sc2);
assertEquals(READY, stateIterator.next());
assertThat(pickers.next().getList()).containsExactly(sc1, sc2, sc3);
assertThat(getList(pickers.next())).containsExactly(sc1, sc2, sc3);
// The IDLE subchannel is dropped from the picker, but a reconnection is requested
assertEquals(READY, stateIterator.next());
assertThat(pickers.next().getList()).containsExactly(sc1, sc3);
assertThat(getList(pickers.next())).containsExactly(sc1, sc3);
verify(sc2, times(2)).requestConnection();
// The failing subchannel is dropped from the picker, with no requested reconnect
assertEquals(READY, stateIterator.next());
assertThat(pickers.next().getList()).containsExactly(sc1);
assertThat(getList(pickers.next())).containsExactly(sc1);
verify(sc3, times(1)).requestConnection();
assertThat(stateIterator.hasNext()).isFalse();
assertThat(pickers.hasNext()).isFalse();
@ -393,14 +407,14 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = picker.getList();
List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel();
@ -426,11 +440,11 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
doReturn(new Metadata()).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = picker.getList();
List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
@ -458,7 +472,7 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
@ -488,7 +502,7 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue1 = new Metadata();
@ -497,7 +511,7 @@ public class RoundRobinLoadBalancerTest {
Metadata headerWithStickinessValue2 = new Metadata();
headerWithStickinessValue2.put(stickinessKey, "my-sticky-value2");
List<Subchannel> allSubchannels = picker.getList();
List<Subchannel> allSubchannels = getList(picker);
doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders();
Subchannel sc1a = picker.pickSubchannel(mockArgs).getSubchannel();
@ -533,16 +547,13 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
@SuppressWarnings("unused")
List<Subchannel> allSubchannels = picker.getList();
// first pick
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
@ -584,16 +595,13 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue1 = new Metadata();
headerWithStickinessValue1.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue1).when(mockArgs).getHeaders();
@SuppressWarnings("unused")
List<Subchannel> allSubchannels = picker.getList();
// first pick
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
@ -609,7 +617,6 @@ public class RoundRobinLoadBalancerTest {
picker = pickerCaptor.getValue();
// second pick with a different stickiness value
@SuppressWarnings("unused")
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
// go back to ready
@ -641,17 +648,18 @@ public class RoundRobinLoadBalancerTest {
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Picker picker = pickerCaptor.getValue();
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = Lists.newArrayList(picker.getList());
List<Subchannel> allSubchannels = Lists.newArrayList(getList(picker));
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
// shutdown channel directly
loadBalancer
.handleSubchannelState(sc1, ConnectivityStateInfo.forNonError(ConnectivityState.SHUTDOWN));
@ -661,6 +669,27 @@ public class RoundRobinLoadBalancerTest {
picker.pickSubchannel(mockArgs).getSubchannel());
assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
verify(mockArgs, atLeast(2)).getHeaders();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(sc2, loadBalancer.getStickinessMapForTest().get("my-sticky-value").value);
// shutdown channel via name resolver change
List<EquivalentAddressGroup> newServers = new ArrayList<EquivalentAddressGroup>(servers);
newServers.remove(sc2.getAddresses());
loadBalancer.handleResolvedAddressGroups(newServers, attributes);
verify(sc2, times(1)).shutdown();
loadBalancer.handleSubchannelState(sc2, ConnectivityStateInfo.forNonError(SHUTDOWN));
assertNull(loadBalancer.getStickinessMapForTest().get("my-sticky-value").value);
assertEquals(nextSubchannel(sc2, allSubchannels),
picker.pickSubchannel(mockArgs).getSubchannel());
assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
verify(mockArgs, atLeast(2)).getHeaders();
}
@Test
@ -696,6 +725,44 @@ public class RoundRobinLoadBalancerTest {
assertSame(stickinessMap1, stickinessMap2);
}
@Test(expected = IllegalArgumentException.class)
public void readyPicker_emptyList() {
// ready picker list must be non-empty
new ReadyPicker(Collections.<Subchannel>emptyList(), 0, null);
}
@Test
public void internalPickerComparisons() {
EmptyPicker emptyOk1 = new EmptyPicker(Status.OK);
EmptyPicker emptyOk2 = new EmptyPicker(Status.OK.withDescription("different OK"));
EmptyPicker emptyErr = new EmptyPicker(Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯"));
Iterator<Subchannel> subchannelIterator = subchannels.values().iterator();
Subchannel sc1 = subchannelIterator.next();
Subchannel sc2 = subchannelIterator.next();
StickinessState stickinessState = new StickinessState("stick-key");
ReadyPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), 0, null);
ReadyPicker ready2 = new ReadyPicker(Arrays.asList(sc1), 0, null);
ReadyPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), 1, null);
ReadyPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), 1, stickinessState);
ReadyPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), 0, stickinessState);
assertTrue(emptyOk1.isEquivalentTo(emptyOk2));
assertFalse(emptyOk1.isEquivalentTo(emptyErr));
assertFalse(ready1.isEquivalentTo(ready2));
assertTrue(ready1.isEquivalentTo(ready3));
assertFalse(ready3.isEquivalentTo(ready4));
assertTrue(ready4.isEquivalentTo(ready5));
assertFalse(emptyOk1.isEquivalentTo(ready1));
assertFalse(ready1.isEquivalentTo(emptyOk1));
}
private static List<Subchannel> getList(SubchannelPicker picker) {
return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() :
Collections.<Subchannel>emptyList();
}
private static class FakeSocketAddress extends SocketAddress {
final String name;