diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 6e08964ee4..5b971c1000 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -19,6 +19,7 @@ package io.grpc; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -455,21 +456,61 @@ public abstract class LoadBalancer { *

The LoadBalancer is responsible for closing unused Subchannels, and closing all * Subchannels within {@link #shutdown}. * + *

The default implementation calls {@link #createSubchannel(List, Attributes)}. + * Implementations should not override this method. + * * @since 1.2.0 */ - public abstract Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs); + public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + Preconditions.checkNotNull(addrs, "addrs"); + return createSubchannel(Collections.singletonList(addrs), attrs); + } + + /** + * Creates a Subchannel, which is a logical connection to the given group of addresses which are + * considered equivalent. The {@code attrs} are custom attributes associated with this + * Subchannel, and can be accessed later through {@link Subchannel#getAttributes + * Subchannel.getAttributes()}. + * + *

The LoadBalancer is responsible for closing unused Subchannels, and closing all + * Subchannels within {@link #shutdown}. + * + * @throws IllegalArgumentException if {@code addrs} is empty + * @since 1.14.0 + */ + public Subchannel createSubchannel(List addrs, Attributes attrs) { + throw new UnsupportedOperationException(); + } /** * Replaces the existing addresses used with {@code subchannel}. This method is superior to * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can * continue using an existing connection. * + *

The default implementation calls {@link #updateSubchannelAddresses( + * LoadBalancer.Subchannel, List)}. Implementations should not override this method. + * * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link * #createSubchannel} * @since 1.4.0 */ public void updateSubchannelAddresses( Subchannel subchannel, EquivalentAddressGroup addrs) { + Preconditions.checkNotNull(addrs, "addrs"); + updateSubchannelAddresses(subchannel, Collections.singletonList(addrs)); + } + + /** + * Replaces the existing addresses used with {@code subchannel}. This method is superior to + * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can + * continue using an existing connection. + * + * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link + * #createSubchannel} or {@code addrs} is empty + * @since 1.14.0 + */ + public void updateSubchannelAddresses( + Subchannel subchannel, List addrs) { throw new UnsupportedOperationException(); } @@ -572,11 +613,30 @@ public abstract class LoadBalancer { public abstract void requestConnection(); /** - * Returns the addresses that this Subchannel is bound to. + * Returns the addresses that this Subchannel is bound to. The default implementation calls + * getAllAddresses(). * + *

The default implementation calls {@link #getAllAddresses()}. Implementations should not + * override this method. + * + * @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup. + * Use getAllAddresses() instead * @since 1.2.0 */ - public abstract EquivalentAddressGroup getAddresses(); + public EquivalentAddressGroup getAddresses() { + List groups = getAllAddresses(); + Preconditions.checkState(groups.size() == 1, "Does not have exactly one group"); + return groups.get(0); + } + + /** + * Returns the addresses that this Subchannel is bound to. The returned list will not be empty. + * + * @since 1.14.0 + */ + public List getAllAddresses() { + throw new UnsupportedOperationException(); + } /** * The same attributes passed to {@link Helper#createSubchannel Helper.createSubchannel()}. diff --git a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java index 1bca5ffe41..86677d3e2f 100644 --- a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java +++ b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java @@ -26,8 +26,6 @@ import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; -import java.net.SocketAddress; -import java.util.ArrayList; import java.util.List; /** @@ -67,19 +65,15 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory { @Override public void handleResolvedAddressGroups( List servers, Attributes attributes) { - // Flatten servers list received from name resolver into single address group. This means that - // as far as load balancer is concerned, there's virtually one single server with multiple - // addresses so the connection will be created only for the first address (pick first). - EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers); if (subchannel == null) { - subchannel = helper.createSubchannel(newEag, Attributes.EMPTY); + subchannel = helper.createSubchannel(servers, Attributes.EMPTY); // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); subchannel.requestConnection(); } else { - helper.updateSubchannelAddresses(subchannel, newEag); + helper.updateSubchannelAddresses(subchannel, servers); } } @@ -126,18 +120,6 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory { subchannel.shutdown(); } } - - /** - * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. - */ - private static EquivalentAddressGroup flattenEquivalentAddressGroup( - List groupList) { - List addrs = new ArrayList(); - for (EquivalentAddressGroup group : groupList) { - addrs.addAll(group.getAddresses()); - } - return new EquivalentAddressGroup(addrs); - } } /** diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 1c8f59595e..8e8c22ba51 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -42,6 +42,7 @@ import io.grpc.internal.Channelz.ChannelTrace; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -86,15 +87,12 @@ final class InternalSubchannel implements Instrumented { // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally". private final ChannelExecutor channelExecutor; - @GuardedBy("lock") - private EquivalentAddressGroup addressGroup; - /** - * The index of the address corresponding to pendingTransport/activeTransport, or 0 if both are - * null. + * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if + * both are null. */ @GuardedBy("lock") - private int addressIndex; + private Index addressIndex; /** * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is @@ -159,13 +157,17 @@ final class InternalSubchannel implements Instrumented { @GuardedBy("lock") private Status shutdownReason; - InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, + InternalSubchannel(List addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, Channelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, TimeProvider timeProvider) { - this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); + Preconditions.checkNotNull(addressGroups, "addressGroups"); + Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); + checkListHasNoNulls(addressGroups, "addressGroups contains null entry"); + this.addressIndex = new Index( + Collections.unmodifiableList(new ArrayList(addressGroups))); this.authority = authority; this.userAgent = userAgent; this.backoffPolicyProvider = backoffPolicyProvider; @@ -213,11 +215,10 @@ final class InternalSubchannel implements Instrumented { private void startNewTransport() { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); - if (addressIndex == 0) { + if (addressIndex.isAtBeginning()) { connectingTimer.reset().start(); } - List addrs = addressGroup.getAddresses(); - SocketAddress address = addrs.get(addressIndex); + SocketAddress address = addressIndex.getCurrentAddress(); ProxyParameters proxy = null; if (address instanceof PairSocketAddress) { @@ -336,28 +337,29 @@ final class InternalSubchannel implements Instrumented { } /** Replaces the existing addresses, avoiding unnecessary reconnects. */ - public void updateAddresses(EquivalentAddressGroup newAddressGroup) { + public void updateAddresses(List newAddressGroups) { + Preconditions.checkNotNull(newAddressGroups, "newAddressGroups"); + checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry"); + Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty"); + newAddressGroups = + Collections.unmodifiableList(new ArrayList(newAddressGroups)); ManagedClientTransport savedTransport = null; try { synchronized (lock) { - EquivalentAddressGroup oldAddressGroup = addressGroup; - addressGroup = newAddressGroup; + SocketAddress previousAddress = addressIndex.getCurrentAddress(); + addressIndex.updateGroups(newAddressGroups); if (state.getState() == READY || state.getState() == CONNECTING) { - SocketAddress address = oldAddressGroup.getAddresses().get(addressIndex); - int newIndex = newAddressGroup.getAddresses().indexOf(address); - if (newIndex != -1) { - addressIndex = newIndex; - } else { + if (!addressIndex.seekTo(previousAddress)) { // Forced to drop the connection if (state.getState() == READY) { savedTransport = activeTransport; activeTransport = null; - addressIndex = 0; + addressIndex.reset(); gotoNonErrorState(IDLE); } else { savedTransport = pendingTransport; pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); startNewTransport(); } } @@ -387,7 +389,7 @@ final class InternalSubchannel implements Instrumented { savedPendingTransport = pendingTransport; activeTransport = null; pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); if (transports.isEmpty()) { handleTermination(); if (log.isLoggable(Level.FINE)) { @@ -409,15 +411,15 @@ final class InternalSubchannel implements Instrumented { @Override public String toString() { - // addressGroupCopy being a little stale is fine, just avoid calling toString with the lock + // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock // since there may be many addresses. - Object addressGroupCopy; + Object addressGroupsCopy; synchronized (lock) { - addressGroupCopy = addressGroup; + addressGroupsCopy = addressIndex.getGroups(); } return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("addressGroup", addressGroupCopy) + .add("addressGroups", addressGroupsCopy) .toString(); } @@ -456,10 +458,10 @@ final class InternalSubchannel implements Instrumented { } } - EquivalentAddressGroup getAddressGroup() { + List getAddressGroups() { try { synchronized (lock) { - return addressGroup; + return addressIndex.getGroups(); } } finally { channelExecutor.drain(); @@ -487,14 +489,14 @@ final class InternalSubchannel implements Instrumented { SettableFuture ret = SettableFuture.create(); ChannelStats.Builder builder = new ChannelStats.Builder(); - EquivalentAddressGroup addressGroupSnapshot; + List addressGroupsSnapshot; List transportsSnapshot; synchronized (lock) { - addressGroupSnapshot = addressGroup; + addressGroupsSnapshot = addressIndex.getGroups(); transportsSnapshot = new ArrayList(transports); } - builder.setTarget(addressGroupSnapshot.toString()).setState(getState()); + builder.setTarget(addressGroupsSnapshot.toString()).setState(getState()); builder.setSockets(transportsSnapshot); callsTracer.updateBuilder(builder); if (channelTracer != null) { @@ -515,6 +517,12 @@ final class InternalSubchannel implements Instrumented { } } + private static void checkListHasNoNulls(List list, String msg) { + for (Object item : list) { + Preconditions.checkNotNull(item, msg); + } + } + /** Listener for real transports. */ private class TransportListener implements ManagedClientTransport.Listener { final ConnectionClientTransport transport; @@ -573,15 +581,15 @@ final class InternalSubchannel implements Instrumented { if (activeTransport == transport) { gotoNonErrorState(IDLE); activeTransport = null; - addressIndex = 0; + addressIndex.reset(); } else if (pendingTransport == transport) { Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); - addressIndex++; + addressIndex.increment(); // Continue reconnect if there are still addresses to try. - if (addressIndex >= addressGroup.getAddresses().size()) { + if (!addressIndex.isValid()) { pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); // Initiate backoff // Transition to TRANSIENT_FAILURE scheduleBackoff(s); @@ -703,4 +711,68 @@ final class InternalSubchannel implements Instrumented { }; } } + + /** Index as in 'i', the pointer to an entry. Not a "search index." */ + @VisibleForTesting + static final class Index { + private List addressGroups; + private int groupIndex; + private int addressIndex; + + public Index(List groups) { + this.addressGroups = groups; + } + + public boolean isValid() { + // addressIndex will never be invalid + return groupIndex < addressGroups.size(); + } + + public boolean isAtBeginning() { + return groupIndex == 0 && addressIndex == 0; + } + + public void increment() { + EquivalentAddressGroup group = addressGroups.get(groupIndex); + addressIndex++; + if (addressIndex >= group.getAddresses().size()) { + groupIndex++; + addressIndex = 0; + } + } + + public void reset() { + groupIndex = 0; + addressIndex = 0; + } + + public SocketAddress getCurrentAddress() { + return addressGroups.get(groupIndex).getAddresses().get(addressIndex); + } + + public List getGroups() { + return addressGroups; + } + + /** Update to new groups, resetting the current index. */ + public void updateGroups(List newGroups) { + addressGroups = newGroups; + reset(); + } + + /** Returns false if the needle was not found and the current index was left unchanged. */ + public boolean seekTo(SocketAddress needle) { + for (int i = 0; i < addressGroups.size(); i++) { + EquivalentAddressGroup group = addressGroups.get(i); + int j = group.getAddresses().indexOf(needle); + if (j == -1) { + continue; + } + this.groupIndex = i; + this.addressIndex = j; + return true; + } + return false; + } + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9a9ac32ffe..7b6fc9e015 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -61,6 +61,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1007,8 +1008,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented addressGroups, Attributes attrs) { + checkNotNull(addressGroups, "addressGroups"); checkNotNull(attrs, "attrs"); // TODO(ejona): can we be even stricter? Like loadBalancer == null? checkState(!terminated, "Channel is terminated"); @@ -1019,7 +1020,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented addrs) { checkArgument(subchannel instanceof SubchannelImpl, "subchannel must have been returned from createSubchannel"); ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); @@ -1154,7 +1155,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented getAllAddresses() { + return subchannel.getAddressGroups(); } @Override diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 1b1829a1cc..95368c0e73 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -43,6 +43,7 @@ import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.ChannelTrace; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -159,8 +160,8 @@ final class OobChannel extends ManagedChannel implements Instrumented getAllAddresses() { + return subchannel.getAddressGroups(); } @Override @@ -181,7 +182,7 @@ final class OobChannel extends ManagedChannel implements Instrumented addrsIn, Attributes attrsIn) { + assertThat(addrsIn).hasSize(1); + assertThat(addrsIn.get(0)).isSameAs(eag); + assertThat(attrsIn).isSameAs(attrs); + ran = true; + return subchannel; + } + } + + OverrideCreateSubchannel helper = new OverrideCreateSubchannel(); + assertThat(helper.createSubchannel(eag, attrs)).isSameAs(subchannel); + assertThat(helper.ran).isTrue(); + } + + @Test(expected = UnsupportedOperationException.class) + public void helper_createSubchannelList_throws() { + new NoopHelper().createSubchannel(Arrays.asList(eag), attrs); + } + + @Test + public void helper_updateSubchannelAddresses_delegates() { + class OverrideUpdateSubchannel extends NoopHelper { + boolean ran; + + @Override + public void updateSubchannelAddresses( + Subchannel subchannelIn, List addrsIn) { + assertThat(subchannelIn).isSameAs(emptySubchannel); + assertThat(addrsIn).hasSize(1); + assertThat(addrsIn.get(0)).isSameAs(eag); + ran = true; + } + } + + OverrideUpdateSubchannel helper = new OverrideUpdateSubchannel(); + helper.updateSubchannelAddresses(emptySubchannel, eag); + assertThat(helper.ran).isTrue(); + } + + @Test(expected = UnsupportedOperationException.class) + public void helper_updateSubchannelAddressesList_throws() { + new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag)); + } + + @Test + public void subchannel_getAddresses_delegates() { + class OverrideGetAllAddresses extends EmptySubchannel { + boolean ran; + + @Override public List getAllAddresses() { + ran = true; + return Arrays.asList(eag); + } + } + + OverrideGetAllAddresses subchannel = new OverrideGetAllAddresses(); + assertThat(subchannel.getAddresses()).isEqualTo(eag); + assertThat(subchannel.ran).isTrue(); + } + + @Test(expected = IllegalStateException.class) + public void subchannel_getAddresses_throwsOnTwoAddrs() { + new EmptySubchannel() { + boolean ran; + + @Override public List getAllAddresses() { + ran = true; + // Doubling up eag is technically a bad idea, but nothing here cares + return Arrays.asList(eag, eag); + } + }.getAddresses(); + } + + private static class NoopHelper extends LoadBalancer.Helper { + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return null; + } + + @Override + public void updateBalancingState( + ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {} + + @Override public void runSerialized(Runnable task) {} + + @Override public NameResolver.Factory getNameResolverFactory() { + return null; + } + + @Override public String getAuthority() { + return null; + } + } + + private static class EmptySubchannel extends LoadBalancer.Subchannel { + @Override public void shutdown() {} + + @Override public void requestConnection() {} + + @Override public Attributes getAttributes() { + return null; + } + } } diff --git a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java index 8e8865912c..0f9cd106c3 100644 --- a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java @@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.inOrder; @@ -63,8 +64,6 @@ public class PickFirstLoadBalancerTest { private static final Attributes.Key FOO = Attributes.Key.create("foo"); private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build(); - @Captor - private ArgumentCaptor eagCaptor; @Captor private ArgumentCaptor pickerCaptor; @Captor @@ -86,7 +85,8 @@ public class PickFirstLoadBalancerTest { } when(mockSubchannel.getAddresses()).thenThrow(new UnsupportedOperationException()); - when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class))) + when(mockHelper.createSubchannel( + anyListOf(EquivalentAddressGroup.class), any(Attributes.class))) .thenReturn(mockSubchannel); loadBalancer = (PickFirstBalancer) PickFirstBalancerFactory.getInstance().newLoadBalancer( @@ -102,11 +102,10 @@ public class PickFirstLoadBalancerTest { public void pickAfterResolved() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); - verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture()); + verify(mockHelper).createSubchannel(eq(servers), attrsCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue()); assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); @@ -120,12 +119,12 @@ public class PickFirstLoadBalancerTest { loadBalancer.handleResolvedAddressGroups(servers, affinity); verifyNoMoreInteractions(mockSubchannel); - verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), + verify(mockHelper).createSubchannel(anyListOf(EquivalentAddressGroup.class), any(Attributes.class)); verify(mockHelper).updateBalancingState(isA(ConnectivityState.class), isA(Picker.class)); // Updating the subchannel addresses is unnecessary, but doesn't hurt anything verify(mockHelper).updateSubchannelAddresses( - eq(mockSubchannel), any(EquivalentAddressGroup.class)); + eq(mockSubchannel), anyListOf(EquivalentAddressGroup.class)); verifyNoMoreInteractions(mockHelper); } @@ -140,15 +139,13 @@ public class PickFirstLoadBalancerTest { InOrder inOrder = inOrder(mockHelper); loadBalancer.handleResolvedAddressGroups(servers, affinity); - inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class)); + inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(socketAddresses, eagCaptor.getValue().getAddresses()); assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); loadBalancer.handleResolvedAddressGroups(newServers, affinity); - inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eagCaptor.capture()); - assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses()); + inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers)); verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockHelper); @@ -209,8 +206,7 @@ public class PickFirstLoadBalancerTest { verify(mockSubchannel, never()).requestConnection(); loadBalancer.handleResolvedAddressGroups(servers, affinity); - inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)), - eq(Attributes.EMPTY)); + inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 082aa3b4b9..1542a6facd 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -113,8 +113,8 @@ public class AutoConfiguredLoadBalancerFactoryTest { new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY)); Helper helper = new TestHelper() { @Override - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { - assertThat(addrs).isEqualTo(servers.get(0)); + public Subchannel createSubchannel(List addrs, Attributes attrs) { + assertThat(addrs).isEqualTo(servers); return new TestSubchannel(addrs, attrs); } @@ -147,8 +147,8 @@ public class AutoConfiguredLoadBalancerFactoryTest { Attributes.EMPTY)); Helper helper = new TestHelper() { @Override - public Subchannel createSubchannel(final EquivalentAddressGroup addrs, Attributes attrs) { - assertThat(addrs).isEqualTo(servers.get(0)); + public Subchannel createSubchannel(List addrs, Attributes attrs) { + assertThat(addrs).isEqualTo(servers); return new TestSubchannel(addrs, attrs); } @@ -304,7 +304,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { } @Override - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + public Subchannel createSubchannel(List addrs, Attributes attrs) { return delegate().createSubchannel(addrs, attrs); } @@ -348,12 +348,12 @@ public class AutoConfiguredLoadBalancerFactoryTest { } private static class TestSubchannel extends Subchannel { - TestSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + TestSubchannel(List addrs, Attributes attrs) { this.addrs = addrs; this.attrs = attrs; } - final EquivalentAddressGroup addrs; + final List addrs; final Attributes attrs; @Override @@ -365,7 +365,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { } @Override - public EquivalentAddressGroup getAddresses() { + public List getAllAddresses() { return addrs; } diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 4a35783e86..d67691ec2d 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -42,10 +42,12 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.Status; import io.grpc.internal.InternalSubchannel.CallTracingTransport; +import io.grpc.internal.InternalSubchannel.Index; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -114,7 +116,6 @@ public class InternalSubchannelTest { }; private InternalSubchannel internalSubchannel; - private EquivalentAddressGroup addressGroup; private BlockingQueue transports; @Before public void setUp() { @@ -133,6 +134,16 @@ public class InternalSubchannelTest { assertEquals(0, fakeExecutor.numPendingTasks()); } + @Test(expected = IllegalArgumentException.class) + public void constructor_emptyEagList_throws() { + createInternalSubchannel(new EquivalentAddressGroup[0]); + } + + @Test(expected = NullPointerException.class) + public void constructor_eagListWithNull_throws() { + createInternalSubchannel(new EquivalentAddressGroup[] {null}); + } + @Test public void singleAddressReconnect() { SocketAddress addr = mock(SocketAddress.class); createInternalSubchannel(addr); @@ -379,6 +390,20 @@ public class InternalSubchannelTest { verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos(); } + @Test(expected = IllegalArgumentException.class) + public void updateAddresses_emptyEagList_throws() { + SocketAddress addr = new FakeSocketAddress(); + createInternalSubchannel(addr); + internalSubchannel.updateAddresses(Arrays.asList()); + } + + @Test(expected = NullPointerException.class) + public void updateAddresses_eagListWithNull_throws() { + SocketAddress addr = new FakeSocketAddress(); + createInternalSubchannel(addr); + internalSubchannel.updateAddresses(Arrays.asList((EquivalentAddressGroup) null)); + } + @Test public void updateAddresses_intersecting_ready() { SocketAddress addr1 = mock(SocketAddress.class); SocketAddress addr2 = mock(SocketAddress.class); @@ -400,7 +425,8 @@ public class InternalSubchannelTest { assertEquals(READY, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); assertNoCallbackInvoke(); assertEquals(READY, internalSubchannel.getState()); verify(transports.peek().transport, never()).shutdown(any(Status.class)); @@ -442,7 +468,8 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); verify(transports.peek().transport, never()).shutdown(any(Status.class)); @@ -471,7 +498,7 @@ public class InternalSubchannelTest { SocketAddress addr2 = mock(SocketAddress.class); createInternalSubchannel(addr1); - internalSubchannel.updateAddresses(new EquivalentAddressGroup(addr2)); + internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2))); // Nothing happened on address update verify(mockTransportFactory, never()) @@ -519,7 +546,8 @@ public class InternalSubchannelTest { assertEquals(READY, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); assertExactCallbackInvokes("onStateChange:IDLE"); assertEquals(IDLE, internalSubchannel.getState()); verify(transports.peek().transport).shutdown(any(Status.class)); @@ -561,7 +589,8 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -946,9 +975,100 @@ public class InternalSubchannelTest { assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId()); } + @Test public void index_looping() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + SocketAddress addr4 = new FakeSocketAddress(); + SocketAddress addr5 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), + new EquivalentAddressGroup(Arrays.asList(addr3)), + new EquivalentAddressGroup(Arrays.asList(addr4, addr5)))); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr2); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr3); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr4); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr5); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isFalse(); + + index.reset(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + // We want to make sure both groupIndex and addressIndex are reset + index.increment(); + index.increment(); + index.increment(); + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr5); + index.reset(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + } + + @Test public void index_updateGroups_resets() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + index.increment(); + index.increment(); + // We want to make sure both groupIndex and addressIndex are reset + index.updateGroups(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + } + + @Test public void index_seekTo() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), + new EquivalentAddressGroup(Arrays.asList(addr3)))); + assertThat(index.seekTo(addr3)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr3); + assertThat(index.seekTo(addr1)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.seekTo(addr2)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr2); + index.seekTo(new FakeSocketAddress()); + // Failed seekTo doesn't change the index + assertThat(index.getCurrentAddress()).isSameAs(addr2); + } + private void createInternalSubchannel(SocketAddress ... addrs) { - addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs)); - internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT, + createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs))); + } + + private void createInternalSubchannel(EquivalentAddressGroup ... addrs) { + List addressGroups = Arrays.asList(addrs); + internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback, channelz, CallTracer.getDefaultFactory().create(), null, @@ -970,4 +1090,6 @@ public class InternalSubchannelTest { assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); callbackInvokes.clear(); } + + private static class FakeSocketAddress extends SocketAddress {} } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 2432130b31..42b09a7d1f 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -2064,7 +2064,8 @@ public class ManagedChannelImplTest { assertEquals(TARGET, getStats(channel).target); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); - assertEquals(addressGroup.toString(), getStats((AbstractSubchannel) subchannel).target); + assertEquals(Collections.singletonList(addressGroup).toString(), + getStats((AbstractSubchannel) subchannel).target); } @Test