core: Add support for List<EAG> in Subchannels

This avoids the needs to flatten to EAGs for cases like PickFirst,
making the Attributes in EAGs able to be used in communication with
core. See #4302 for some discussion on the topic.
This commit is contained in:
Eric Anderson 2018-07-02 14:00:05 -07:00 committed by GitHub
parent 5b59c696fc
commit e3ff1ade07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 453 additions and 99 deletions

View File

@ -19,6 +19,7 @@ package io.grpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -455,21 +456,61 @@ public abstract class LoadBalancer {
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
* Subchannels within {@link #shutdown}.
*
* <p>The default implementation calls {@link #createSubchannel(List, Attributes)}.
* Implementations should not override this method.
*
* @since 1.2.0
*/
public abstract Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs);
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
Preconditions.checkNotNull(addrs, "addrs");
return createSubchannel(Collections.singletonList(addrs), attrs);
}
/**
* Creates a Subchannel, which is a logical connection to the given group of addresses which are
* considered equivalent. The {@code attrs} are custom attributes associated with this
* Subchannel, and can be accessed later through {@link Subchannel#getAttributes
* Subchannel.getAttributes()}.
*
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
* Subchannels within {@link #shutdown}.
*
* @throws IllegalArgumentException if {@code addrs} is empty
* @since 1.14.0
*/
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
throw new UnsupportedOperationException();
}
/**
* Replaces the existing addresses used with {@code subchannel}. This method is superior to
* {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can
* continue using an existing connection.
*
* <p>The default implementation calls {@link #updateSubchannelAddresses(
* LoadBalancer.Subchannel, List)}. Implementations should not override this method.
*
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
* #createSubchannel}
* @since 1.4.0
*/
public void updateSubchannelAddresses(
Subchannel subchannel, EquivalentAddressGroup addrs) {
Preconditions.checkNotNull(addrs, "addrs");
updateSubchannelAddresses(subchannel, Collections.singletonList(addrs));
}
/**
* Replaces the existing addresses used with {@code subchannel}. This method is superior to
* {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can
* continue using an existing connection.
*
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
* #createSubchannel} or {@code addrs} is empty
* @since 1.14.0
*/
public void updateSubchannelAddresses(
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
throw new UnsupportedOperationException();
}
@ -572,11 +613,30 @@ public abstract class LoadBalancer {
public abstract void requestConnection();
/**
* Returns the addresses that this Subchannel is bound to.
* Returns the addresses that this Subchannel is bound to. The default implementation calls
* getAllAddresses().
*
* <p>The default implementation calls {@link #getAllAddresses()}. Implementations should not
* override this method.
*
* @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup.
* Use getAllAddresses() instead
* @since 1.2.0
*/
public abstract EquivalentAddressGroup getAddresses();
public EquivalentAddressGroup getAddresses() {
List<EquivalentAddressGroup> groups = getAllAddresses();
Preconditions.checkState(groups.size() == 1, "Does not have exactly one group");
return groups.get(0);
}
/**
* Returns the addresses that this Subchannel is bound to. The returned list will not be empty.
*
* @since 1.14.0
*/
public List<EquivalentAddressGroup> getAllAddresses() {
throw new UnsupportedOperationException();
}
/**
* The same attributes passed to {@link Helper#createSubchannel Helper.createSubchannel()}.

View File

@ -26,8 +26,6 @@ import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
@ -67,19 +65,15 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
// Flatten servers list received from name resolver into single address group. This means that
// as far as load balancer is concerned, there's virtually one single server with multiple
// addresses so the connection will be created only for the first address (pick first).
EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers);
if (subchannel == null) {
subchannel = helper.createSubchannel(newEag, Attributes.EMPTY);
subchannel = helper.createSubchannel(servers, Attributes.EMPTY);
// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
helper.updateSubchannelAddresses(subchannel, newEag);
helper.updateSubchannelAddresses(subchannel, servers);
}
}
@ -126,18 +120,6 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
subchannel.shutdown();
}
}
/**
* Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup flattenEquivalentAddressGroup(
List<EquivalentAddressGroup> groupList) {
List<SocketAddress> addrs = new ArrayList<SocketAddress>();
for (EquivalentAddressGroup group : groupList) {
addrs.addAll(group.getAddresses());
}
return new EquivalentAddressGroup(addrs);
}
}
/**

View File

@ -42,6 +42,7 @@ import io.grpc.internal.Channelz.ChannelTrace;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -86,15 +87,12 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
// 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally".
private final ChannelExecutor channelExecutor;
@GuardedBy("lock")
private EquivalentAddressGroup addressGroup;
/**
* The index of the address corresponding to pendingTransport/activeTransport, or 0 if both are
* null.
* The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
* both are null.
*/
@GuardedBy("lock")
private int addressIndex;
private Index addressIndex;
/**
* The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
@ -159,13 +157,17 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
@GuardedBy("lock")
private Status shutdownReason;
InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent,
InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback,
Channelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer,
TimeProvider timeProvider) {
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
this.addressIndex = new Index(
Collections.unmodifiableList(new ArrayList<EquivalentAddressGroup>(addressGroups)));
this.authority = authority;
this.userAgent = userAgent;
this.backoffPolicyProvider = backoffPolicyProvider;
@ -213,11 +215,10 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
private void startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (addressIndex == 0) {
if (addressIndex.isAtBeginning()) {
connectingTimer.reset().start();
}
List<SocketAddress> addrs = addressGroup.getAddresses();
SocketAddress address = addrs.get(addressIndex);
SocketAddress address = addressIndex.getCurrentAddress();
ProxyParameters proxy = null;
if (address instanceof PairSocketAddress) {
@ -336,28 +337,29 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
}
/** Replaces the existing addresses, avoiding unnecessary reconnects. */
public void updateAddresses(EquivalentAddressGroup newAddressGroup) {
public void updateAddresses(List<EquivalentAddressGroup> newAddressGroups) {
Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
newAddressGroups =
Collections.unmodifiableList(new ArrayList<EquivalentAddressGroup>(newAddressGroups));
ManagedClientTransport savedTransport = null;
try {
synchronized (lock) {
EquivalentAddressGroup oldAddressGroup = addressGroup;
addressGroup = newAddressGroup;
SocketAddress previousAddress = addressIndex.getCurrentAddress();
addressIndex.updateGroups(newAddressGroups);
if (state.getState() == READY || state.getState() == CONNECTING) {
SocketAddress address = oldAddressGroup.getAddresses().get(addressIndex);
int newIndex = newAddressGroup.getAddresses().indexOf(address);
if (newIndex != -1) {
addressIndex = newIndex;
} else {
if (!addressIndex.seekTo(previousAddress)) {
// Forced to drop the connection
if (state.getState() == READY) {
savedTransport = activeTransport;
activeTransport = null;
addressIndex = 0;
addressIndex.reset();
gotoNonErrorState(IDLE);
} else {
savedTransport = pendingTransport;
pendingTransport = null;
addressIndex = 0;
addressIndex.reset();
startNewTransport();
}
}
@ -387,7 +389,7 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
savedPendingTransport = pendingTransport;
activeTransport = null;
pendingTransport = null;
addressIndex = 0;
addressIndex.reset();
if (transports.isEmpty()) {
handleTermination();
if (log.isLoggable(Level.FINE)) {
@ -409,15 +411,15 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
@Override
public String toString() {
// addressGroupCopy being a little stale is fine, just avoid calling toString with the lock
// addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
// since there may be many addresses.
Object addressGroupCopy;
Object addressGroupsCopy;
synchronized (lock) {
addressGroupCopy = addressGroup;
addressGroupsCopy = addressIndex.getGroups();
}
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
.add("addressGroup", addressGroupCopy)
.add("addressGroups", addressGroupsCopy)
.toString();
}
@ -456,10 +458,10 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
}
}
EquivalentAddressGroup getAddressGroup() {
List<EquivalentAddressGroup> getAddressGroups() {
try {
synchronized (lock) {
return addressGroup;
return addressIndex.getGroups();
}
} finally {
channelExecutor.drain();
@ -487,14 +489,14 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
SettableFuture<ChannelStats> ret = SettableFuture.create();
ChannelStats.Builder builder = new ChannelStats.Builder();
EquivalentAddressGroup addressGroupSnapshot;
List<EquivalentAddressGroup> addressGroupsSnapshot;
List<WithLogId> transportsSnapshot;
synchronized (lock) {
addressGroupSnapshot = addressGroup;
addressGroupsSnapshot = addressIndex.getGroups();
transportsSnapshot = new ArrayList<WithLogId>(transports);
}
builder.setTarget(addressGroupSnapshot.toString()).setState(getState());
builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
builder.setSockets(transportsSnapshot);
callsTracer.updateBuilder(builder);
if (channelTracer != null) {
@ -515,6 +517,12 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
}
}
private static void checkListHasNoNulls(List<?> list, String msg) {
for (Object item : list) {
Preconditions.checkNotNull(item, msg);
}
}
/** Listener for real transports. */
private class TransportListener implements ManagedClientTransport.Listener {
final ConnectionClientTransport transport;
@ -573,15 +581,15 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
if (activeTransport == transport) {
gotoNonErrorState(IDLE);
activeTransport = null;
addressIndex = 0;
addressIndex.reset();
} else if (pendingTransport == transport) {
Preconditions.checkState(state.getState() == CONNECTING,
"Expected state is CONNECTING, actual state is %s", state.getState());
addressIndex++;
addressIndex.increment();
// Continue reconnect if there are still addresses to try.
if (addressIndex >= addressGroup.getAddresses().size()) {
if (!addressIndex.isValid()) {
pendingTransport = null;
addressIndex = 0;
addressIndex.reset();
// Initiate backoff
// Transition to TRANSIENT_FAILURE
scheduleBackoff(s);
@ -703,4 +711,68 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
};
}
}
/** Index as in 'i', the pointer to an entry. Not a "search index." */
@VisibleForTesting
static final class Index {
private List<EquivalentAddressGroup> addressGroups;
private int groupIndex;
private int addressIndex;
public Index(List<EquivalentAddressGroup> groups) {
this.addressGroups = groups;
}
public boolean isValid() {
// addressIndex will never be invalid
return groupIndex < addressGroups.size();
}
public boolean isAtBeginning() {
return groupIndex == 0 && addressIndex == 0;
}
public void increment() {
EquivalentAddressGroup group = addressGroups.get(groupIndex);
addressIndex++;
if (addressIndex >= group.getAddresses().size()) {
groupIndex++;
addressIndex = 0;
}
}
public void reset() {
groupIndex = 0;
addressIndex = 0;
}
public SocketAddress getCurrentAddress() {
return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
}
public List<EquivalentAddressGroup> getGroups() {
return addressGroups;
}
/** Update to new groups, resetting the current index. */
public void updateGroups(List<EquivalentAddressGroup> newGroups) {
addressGroups = newGroups;
reset();
}
/** Returns false if the needle was not found and the current index was left unchanged. */
public boolean seekTo(SocketAddress needle) {
for (int i = 0; i < addressGroups.size(); i++) {
EquivalentAddressGroup group = addressGroups.get(i);
int j = group.getAddresses().indexOf(needle);
if (j == -1) {
continue;
}
this.groupIndex = i;
this.addressIndex = j;
return true;
}
return false;
}
}
}

View File

@ -61,6 +61,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -1007,8 +1008,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
@Override
public AbstractSubchannel createSubchannel(
EquivalentAddressGroup addressGroup, Attributes attrs) {
checkNotNull(addressGroup, "addressGroup");
List<EquivalentAddressGroup> addressGroups, Attributes attrs) {
checkNotNull(addressGroups, "addressGroups");
checkNotNull(attrs, "attrs");
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
checkState(!terminated, "Channel is terminated");
@ -1019,7 +1020,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup,
addressGroups,
authority(),
userAgent,
backoffPolicyProvider,
@ -1070,7 +1071,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
channelz.addSubchannel(internalSubchannel);
subchannel.subchannel = internalSubchannel;
logger.log(Level.FINE, "[{0}] {1} created for {2}",
new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroup});
new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
runSerialized(new Runnable() {
@Override
public void run() {
@ -1125,7 +1126,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
@Override
public void updateSubchannelAddresses(
LoadBalancer.Subchannel subchannel, EquivalentAddressGroup addrs) {
LoadBalancer.Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
checkArgument(subchannel instanceof SubchannelImpl,
"subchannel must have been returned from createSubchannel");
((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs);
@ -1154,7 +1155,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
}
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
Collections.singletonList(addressGroup),
authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
// All callback methods are run from channelExecutor
new InternalSubchannel.Callback() {
@ -1413,8 +1415,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
}
@Override
public EquivalentAddressGroup getAddresses() {
return subchannel.getAddressGroup();
public List<EquivalentAddressGroup> getAllAddresses() {
return subchannel.getAddressGroups();
}
@Override

View File

@ -43,6 +43,7 @@ import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.ChannelTrace;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -159,8 +160,8 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
}
@Override
public EquivalentAddressGroup getAddresses() {
return subchannel.getAddressGroup();
public List<EquivalentAddressGroup> getAllAddresses() {
return subchannel.getAddressGroups();
}
@Override
@ -181,7 +182,7 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
}
void updateAddresses(EquivalentAddressGroup eag) {
subchannel.updateAddresses(eag);
subchannel.updateAddresses(Collections.singletonList(eag));
}
@Override

View File

@ -23,6 +23,9 @@ import io.grpc.ClientStreamTracer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.Status;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -35,6 +38,11 @@ public class LoadBalancerTest {
private final ClientStreamTracer.Factory tracerFactory = mock(ClientStreamTracer.Factory.class);
private final Status status = Status.UNAVAILABLE.withDescription("for test");
private final Status status2 = Status.UNAVAILABLE.withDescription("for test 2");
private final EquivalentAddressGroup eag = new EquivalentAddressGroup(new SocketAddress() {});
private final Attributes attrs = Attributes.newBuilder()
.set(Attributes.Key.create("trash"), new Object())
.build();
private final Subchannel emptySubchannel = new EmptySubchannel();
@Test
public void pickResult_withSubchannel() {
@ -111,4 +119,114 @@ public class LoadBalancerTest {
assertThat(error1.getStatus()).isEqualTo(drop1.getStatus());
assertThat(error1).isNotEqualTo(drop1);
}
@Test
public void helper_createSubchannel_delegates() {
class OverrideCreateSubchannel extends NoopHelper {
boolean ran;
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrsIn, Attributes attrsIn) {
assertThat(addrsIn).hasSize(1);
assertThat(addrsIn.get(0)).isSameAs(eag);
assertThat(attrsIn).isSameAs(attrs);
ran = true;
return subchannel;
}
}
OverrideCreateSubchannel helper = new OverrideCreateSubchannel();
assertThat(helper.createSubchannel(eag, attrs)).isSameAs(subchannel);
assertThat(helper.ran).isTrue();
}
@Test(expected = UnsupportedOperationException.class)
public void helper_createSubchannelList_throws() {
new NoopHelper().createSubchannel(Arrays.asList(eag), attrs);
}
@Test
public void helper_updateSubchannelAddresses_delegates() {
class OverrideUpdateSubchannel extends NoopHelper {
boolean ran;
@Override
public void updateSubchannelAddresses(
Subchannel subchannelIn, List<EquivalentAddressGroup> addrsIn) {
assertThat(subchannelIn).isSameAs(emptySubchannel);
assertThat(addrsIn).hasSize(1);
assertThat(addrsIn.get(0)).isSameAs(eag);
ran = true;
}
}
OverrideUpdateSubchannel helper = new OverrideUpdateSubchannel();
helper.updateSubchannelAddresses(emptySubchannel, eag);
assertThat(helper.ran).isTrue();
}
@Test(expected = UnsupportedOperationException.class)
public void helper_updateSubchannelAddressesList_throws() {
new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag));
}
@Test
public void subchannel_getAddresses_delegates() {
class OverrideGetAllAddresses extends EmptySubchannel {
boolean ran;
@Override public List<EquivalentAddressGroup> getAllAddresses() {
ran = true;
return Arrays.asList(eag);
}
}
OverrideGetAllAddresses subchannel = new OverrideGetAllAddresses();
assertThat(subchannel.getAddresses()).isEqualTo(eag);
assertThat(subchannel.ran).isTrue();
}
@Test(expected = IllegalStateException.class)
public void subchannel_getAddresses_throwsOnTwoAddrs() {
new EmptySubchannel() {
boolean ran;
@Override public List<EquivalentAddressGroup> getAllAddresses() {
ran = true;
// Doubling up eag is technically a bad idea, but nothing here cares
return Arrays.asList(eag, eag);
}
}.getAddresses();
}
private static class NoopHelper extends LoadBalancer.Helper {
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
return null;
}
@Override
public void updateBalancingState(
ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {}
@Override public void runSerialized(Runnable task) {}
@Override public NameResolver.Factory getNameResolverFactory() {
return null;
}
@Override public String getAuthority() {
return null;
}
}
private static class EmptySubchannel extends LoadBalancer.Subchannel {
@Override public void shutdown() {}
@Override public void requestConnection() {}
@Override public Attributes getAttributes() {
return null;
}
}
}

View File

@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.inOrder;
@ -63,8 +64,6 @@ public class PickFirstLoadBalancerTest {
private static final Attributes.Key<String> FOO = Attributes.Key.create("foo");
private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build();
@Captor
private ArgumentCaptor<EquivalentAddressGroup> eagCaptor;
@Captor
private ArgumentCaptor<Picker> pickerCaptor;
@Captor
@ -86,7 +85,8 @@ public class PickFirstLoadBalancerTest {
}
when(mockSubchannel.getAddresses()).thenThrow(new UnsupportedOperationException());
when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
when(mockHelper.createSubchannel(
anyListOf(EquivalentAddressGroup.class), any(Attributes.class)))
.thenReturn(mockSubchannel);
loadBalancer = (PickFirstBalancer) PickFirstBalancerFactory.getInstance().newLoadBalancer(
@ -102,11 +102,10 @@ public class PickFirstLoadBalancerTest {
public void pickAfterResolved() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture());
verify(mockHelper).createSubchannel(eq(servers), attrsCaptor.capture());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue());
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
@ -120,12 +119,12 @@ public class PickFirstLoadBalancerTest {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verifyNoMoreInteractions(mockSubchannel);
verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class),
verify(mockHelper).createSubchannel(anyListOf(EquivalentAddressGroup.class),
any(Attributes.class));
verify(mockHelper).updateBalancingState(isA(ConnectivityState.class), isA(Picker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockHelper).updateSubchannelAddresses(
eq(mockSubchannel), any(EquivalentAddressGroup.class));
eq(mockSubchannel), anyListOf(EquivalentAddressGroup.class));
verifyNoMoreInteractions(mockHelper);
}
@ -140,15 +139,13 @@ public class PickFirstLoadBalancerTest {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(socketAddresses, eagCaptor.getValue().getAddresses());
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
loadBalancer.handleResolvedAddressGroups(newServers, affinity);
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eagCaptor.capture());
assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses());
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers));
verifyNoMoreInteractions(mockSubchannel);
verifyNoMoreInteractions(mockHelper);
@ -209,8 +206,7 @@ public class PickFirstLoadBalancerTest {
verify(mockSubchannel, never()).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)),
eq(Attributes.EMPTY));
inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();

View File

@ -113,8 +113,8 @@ public class AutoConfiguredLoadBalancerFactoryTest {
new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers.get(0));
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers);
return new TestSubchannel(addrs, attrs);
}
@ -147,8 +147,8 @@ public class AutoConfiguredLoadBalancerFactoryTest {
Attributes.EMPTY));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(final EquivalentAddressGroup addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers.get(0));
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers);
return new TestSubchannel(addrs, attrs);
}
@ -304,7 +304,7 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
@Override
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
return delegate().createSubchannel(addrs, attrs);
}
@ -348,12 +348,12 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
private static class TestSubchannel extends Subchannel {
TestSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
TestSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
this.addrs = addrs;
this.attrs = attrs;
}
final EquivalentAddressGroup addrs;
final List<EquivalentAddressGroup> addrs;
final Attributes attrs;
@Override
@ -365,7 +365,7 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
@Override
public EquivalentAddressGroup getAddresses() {
public List<EquivalentAddressGroup> getAllAddresses() {
return addrs;
}

View File

@ -42,10 +42,12 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Status;
import io.grpc.internal.InternalSubchannel.CallTracingTransport;
import io.grpc.internal.InternalSubchannel.Index;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
@ -114,7 +116,6 @@ public class InternalSubchannelTest {
};
private InternalSubchannel internalSubchannel;
private EquivalentAddressGroup addressGroup;
private BlockingQueue<MockClientTransportInfo> transports;
@Before public void setUp() {
@ -133,6 +134,16 @@ public class InternalSubchannelTest {
assertEquals(0, fakeExecutor.numPendingTasks());
}
@Test(expected = IllegalArgumentException.class)
public void constructor_emptyEagList_throws() {
createInternalSubchannel(new EquivalentAddressGroup[0]);
}
@Test(expected = NullPointerException.class)
public void constructor_eagListWithNull_throws() {
createInternalSubchannel(new EquivalentAddressGroup[] {null});
}
@Test public void singleAddressReconnect() {
SocketAddress addr = mock(SocketAddress.class);
createInternalSubchannel(addr);
@ -379,6 +390,20 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos();
}
@Test(expected = IllegalArgumentException.class)
public void updateAddresses_emptyEagList_throws() {
SocketAddress addr = new FakeSocketAddress();
createInternalSubchannel(addr);
internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList());
}
@Test(expected = NullPointerException.class)
public void updateAddresses_eagListWithNull_throws() {
SocketAddress addr = new FakeSocketAddress();
createInternalSubchannel(addr);
internalSubchannel.updateAddresses(Arrays.asList((EquivalentAddressGroup) null));
}
@Test public void updateAddresses_intersecting_ready() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
@ -400,7 +425,8 @@ public class InternalSubchannelTest {
assertEquals(READY, internalSubchannel.getState());
// Update addresses
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)));
internalSubchannel.updateAddresses(
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
assertNoCallbackInvoke();
assertEquals(READY, internalSubchannel.getState());
verify(transports.peek().transport, never()).shutdown(any(Status.class));
@ -442,7 +468,8 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState());
// Update addresses
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)));
internalSubchannel.updateAddresses(
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());
verify(transports.peek().transport, never()).shutdown(any(Status.class));
@ -471,7 +498,7 @@ public class InternalSubchannelTest {
SocketAddress addr2 = mock(SocketAddress.class);
createInternalSubchannel(addr1);
internalSubchannel.updateAddresses(new EquivalentAddressGroup(addr2));
internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2)));
// Nothing happened on address update
verify(mockTransportFactory, never())
@ -519,7 +546,8 @@ public class InternalSubchannelTest {
assertEquals(READY, internalSubchannel.getState());
// Update addresses
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)));
internalSubchannel.updateAddresses(
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
assertExactCallbackInvokes("onStateChange:IDLE");
assertEquals(IDLE, internalSubchannel.getState());
verify(transports.peek().transport).shutdown(any(Status.class));
@ -561,7 +589,8 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState());
// Update addresses
internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)));
internalSubchannel.updateAddresses(
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))));
assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState());
@ -946,9 +975,100 @@ public class InternalSubchannelTest {
assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId());
}
@Test public void index_looping() {
SocketAddress addr1 = new FakeSocketAddress();
SocketAddress addr2 = new FakeSocketAddress();
SocketAddress addr3 = new FakeSocketAddress();
SocketAddress addr4 = new FakeSocketAddress();
SocketAddress addr5 = new FakeSocketAddress();
Index index = new Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
new EquivalentAddressGroup(Arrays.asList(addr3)),
new EquivalentAddressGroup(Arrays.asList(addr4, addr5))));
assertThat(index.getCurrentAddress()).isSameAs(addr1);
assertThat(index.isAtBeginning()).isTrue();
assertThat(index.isValid()).isTrue();
index.increment();
assertThat(index.getCurrentAddress()).isSameAs(addr2);
assertThat(index.isAtBeginning()).isFalse();
assertThat(index.isValid()).isTrue();
index.increment();
assertThat(index.getCurrentAddress()).isSameAs(addr3);
assertThat(index.isAtBeginning()).isFalse();
assertThat(index.isValid()).isTrue();
index.increment();
assertThat(index.getCurrentAddress()).isSameAs(addr4);
assertThat(index.isAtBeginning()).isFalse();
assertThat(index.isValid()).isTrue();
index.increment();
assertThat(index.getCurrentAddress()).isSameAs(addr5);
assertThat(index.isAtBeginning()).isFalse();
assertThat(index.isValid()).isTrue();
index.increment();
assertThat(index.isAtBeginning()).isFalse();
assertThat(index.isValid()).isFalse();
index.reset();
assertThat(index.getCurrentAddress()).isSameAs(addr1);
assertThat(index.isAtBeginning()).isTrue();
assertThat(index.isValid()).isTrue();
// We want to make sure both groupIndex and addressIndex are reset
index.increment();
index.increment();
index.increment();
index.increment();
assertThat(index.getCurrentAddress()).isSameAs(addr5);
index.reset();
assertThat(index.getCurrentAddress()).isSameAs(addr1);
}
@Test public void index_updateGroups_resets() {
SocketAddress addr1 = new FakeSocketAddress();
SocketAddress addr2 = new FakeSocketAddress();
SocketAddress addr3 = new FakeSocketAddress();
Index index = new Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
index.increment();
index.increment();
// We want to make sure both groupIndex and addressIndex are reset
index.updateGroups(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
assertThat(index.getCurrentAddress()).isSameAs(addr1);
}
@Test public void index_seekTo() {
SocketAddress addr1 = new FakeSocketAddress();
SocketAddress addr2 = new FakeSocketAddress();
SocketAddress addr3 = new FakeSocketAddress();
Index index = new Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
new EquivalentAddressGroup(Arrays.asList(addr3))));
assertThat(index.seekTo(addr3)).isTrue();
assertThat(index.getCurrentAddress()).isSameAs(addr3);
assertThat(index.seekTo(addr1)).isTrue();
assertThat(index.getCurrentAddress()).isSameAs(addr1);
assertThat(index.seekTo(addr2)).isTrue();
assertThat(index.getCurrentAddress()).isSameAs(addr2);
index.seekTo(new FakeSocketAddress());
// Failed seekTo doesn't change the index
assertThat(index.getCurrentAddress()).isSameAs(addr2);
}
private void createInternalSubchannel(SocketAddress ... addrs) {
addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT,
createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs)));
}
private void createInternalSubchannel(EquivalentAddressGroup ... addrs) {
List<EquivalentAddressGroup> addressGroups = Arrays.asList(addrs);
internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback,
channelz, CallTracer.getDefaultFactory().create(), null,
@ -970,4 +1090,6 @@ public class InternalSubchannelTest {
assertEquals(Arrays.asList(expectedInvokes), callbackInvokes);
callbackInvokes.clear();
}
private static class FakeSocketAddress extends SocketAddress {}
}

View File

@ -2064,7 +2064,8 @@ public class ManagedChannelImplTest {
assertEquals(TARGET, getStats(channel).target);
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
assertEquals(addressGroup.toString(), getStats((AbstractSubchannel) subchannel).target);
assertEquals(Collections.singletonList(addressGroup).toString(),
getStats((AbstractSubchannel) subchannel).target);
}
@Test