From e3ff1ade078fdbb566de5944048eae76b0c87add Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Mon, 2 Jul 2018 14:00:05 -0700 Subject: [PATCH] core: Add support for List in Subchannels This avoids the needs to flatten to EAGs for cases like PickFirst, making the Attributes in EAGs able to be used in communication with core. See #4302 for some discussion on the topic. --- core/src/main/java/io/grpc/LoadBalancer.java | 66 +++++++- .../io/grpc/PickFirstBalancerFactory.java | 22 +-- .../io/grpc/internal/InternalSubchannel.java | 142 +++++++++++++----- .../io/grpc/internal/ManagedChannelImpl.java | 18 ++- .../java/io/grpc/internal/OobChannel.java | 7 +- .../test/java/io/grpc/LoadBalancerTest.java | 118 +++++++++++++++ .../io/grpc/PickFirstLoadBalancerTest.java | 22 ++- ...AutoConfiguredLoadBalancerFactoryTest.java | 16 +- .../grpc/internal/InternalSubchannelTest.java | 138 ++++++++++++++++- .../grpc/internal/ManagedChannelImplTest.java | 3 +- 10 files changed, 453 insertions(+), 99 deletions(-) diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 6e08964ee4..5b971c1000 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -19,6 +19,7 @@ package io.grpc; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import java.util.Collections; import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -455,21 +456,61 @@ public abstract class LoadBalancer { *

The LoadBalancer is responsible for closing unused Subchannels, and closing all * Subchannels within {@link #shutdown}. * + *

The default implementation calls {@link #createSubchannel(List, Attributes)}. + * Implementations should not override this method. + * * @since 1.2.0 */ - public abstract Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs); + public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + Preconditions.checkNotNull(addrs, "addrs"); + return createSubchannel(Collections.singletonList(addrs), attrs); + } + + /** + * Creates a Subchannel, which is a logical connection to the given group of addresses which are + * considered equivalent. The {@code attrs} are custom attributes associated with this + * Subchannel, and can be accessed later through {@link Subchannel#getAttributes + * Subchannel.getAttributes()}. + * + *

The LoadBalancer is responsible for closing unused Subchannels, and closing all + * Subchannels within {@link #shutdown}. + * + * @throws IllegalArgumentException if {@code addrs} is empty + * @since 1.14.0 + */ + public Subchannel createSubchannel(List addrs, Attributes attrs) { + throw new UnsupportedOperationException(); + } /** * Replaces the existing addresses used with {@code subchannel}. This method is superior to * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can * continue using an existing connection. * + *

The default implementation calls {@link #updateSubchannelAddresses( + * LoadBalancer.Subchannel, List)}. Implementations should not override this method. + * * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link * #createSubchannel} * @since 1.4.0 */ public void updateSubchannelAddresses( Subchannel subchannel, EquivalentAddressGroup addrs) { + Preconditions.checkNotNull(addrs, "addrs"); + updateSubchannelAddresses(subchannel, Collections.singletonList(addrs)); + } + + /** + * Replaces the existing addresses used with {@code subchannel}. This method is superior to + * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can + * continue using an existing connection. + * + * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link + * #createSubchannel} or {@code addrs} is empty + * @since 1.14.0 + */ + public void updateSubchannelAddresses( + Subchannel subchannel, List addrs) { throw new UnsupportedOperationException(); } @@ -572,11 +613,30 @@ public abstract class LoadBalancer { public abstract void requestConnection(); /** - * Returns the addresses that this Subchannel is bound to. + * Returns the addresses that this Subchannel is bound to. The default implementation calls + * getAllAddresses(). * + *

The default implementation calls {@link #getAllAddresses()}. Implementations should not + * override this method. + * + * @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup. + * Use getAllAddresses() instead * @since 1.2.0 */ - public abstract EquivalentAddressGroup getAddresses(); + public EquivalentAddressGroup getAddresses() { + List groups = getAllAddresses(); + Preconditions.checkState(groups.size() == 1, "Does not have exactly one group"); + return groups.get(0); + } + + /** + * Returns the addresses that this Subchannel is bound to. The returned list will not be empty. + * + * @since 1.14.0 + */ + public List getAllAddresses() { + throw new UnsupportedOperationException(); + } /** * The same attributes passed to {@link Helper#createSubchannel Helper.createSubchannel()}. diff --git a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java index 1bca5ffe41..86677d3e2f 100644 --- a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java +++ b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java @@ -26,8 +26,6 @@ import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; -import java.net.SocketAddress; -import java.util.ArrayList; import java.util.List; /** @@ -67,19 +65,15 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory { @Override public void handleResolvedAddressGroups( List servers, Attributes attributes) { - // Flatten servers list received from name resolver into single address group. This means that - // as far as load balancer is concerned, there's virtually one single server with multiple - // addresses so the connection will be created only for the first address (pick first). - EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers); if (subchannel == null) { - subchannel = helper.createSubchannel(newEag, Attributes.EMPTY); + subchannel = helper.createSubchannel(servers, Attributes.EMPTY); // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); subchannel.requestConnection(); } else { - helper.updateSubchannelAddresses(subchannel, newEag); + helper.updateSubchannelAddresses(subchannel, servers); } } @@ -126,18 +120,6 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory { subchannel.shutdown(); } } - - /** - * Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object. - */ - private static EquivalentAddressGroup flattenEquivalentAddressGroup( - List groupList) { - List addrs = new ArrayList(); - for (EquivalentAddressGroup group : groupList) { - addrs.addAll(group.getAddresses()); - } - return new EquivalentAddressGroup(addrs); - } } /** diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 1c8f59595e..8e8c22ba51 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -42,6 +42,7 @@ import io.grpc.internal.Channelz.ChannelTrace; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -86,15 +87,12 @@ final class InternalSubchannel implements Instrumented { // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally". private final ChannelExecutor channelExecutor; - @GuardedBy("lock") - private EquivalentAddressGroup addressGroup; - /** - * The index of the address corresponding to pendingTransport/activeTransport, or 0 if both are - * null. + * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if + * both are null. */ @GuardedBy("lock") - private int addressIndex; + private Index addressIndex; /** * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is @@ -159,13 +157,17 @@ final class InternalSubchannel implements Instrumented { @GuardedBy("lock") private Status shutdownReason; - InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, + InternalSubchannel(List addressGroups, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, Channelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer, TimeProvider timeProvider) { - this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); + Preconditions.checkNotNull(addressGroups, "addressGroups"); + Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty"); + checkListHasNoNulls(addressGroups, "addressGroups contains null entry"); + this.addressIndex = new Index( + Collections.unmodifiableList(new ArrayList(addressGroups))); this.authority = authority; this.userAgent = userAgent; this.backoffPolicyProvider = backoffPolicyProvider; @@ -213,11 +215,10 @@ final class InternalSubchannel implements Instrumented { private void startNewTransport() { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); - if (addressIndex == 0) { + if (addressIndex.isAtBeginning()) { connectingTimer.reset().start(); } - List addrs = addressGroup.getAddresses(); - SocketAddress address = addrs.get(addressIndex); + SocketAddress address = addressIndex.getCurrentAddress(); ProxyParameters proxy = null; if (address instanceof PairSocketAddress) { @@ -336,28 +337,29 @@ final class InternalSubchannel implements Instrumented { } /** Replaces the existing addresses, avoiding unnecessary reconnects. */ - public void updateAddresses(EquivalentAddressGroup newAddressGroup) { + public void updateAddresses(List newAddressGroups) { + Preconditions.checkNotNull(newAddressGroups, "newAddressGroups"); + checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry"); + Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty"); + newAddressGroups = + Collections.unmodifiableList(new ArrayList(newAddressGroups)); ManagedClientTransport savedTransport = null; try { synchronized (lock) { - EquivalentAddressGroup oldAddressGroup = addressGroup; - addressGroup = newAddressGroup; + SocketAddress previousAddress = addressIndex.getCurrentAddress(); + addressIndex.updateGroups(newAddressGroups); if (state.getState() == READY || state.getState() == CONNECTING) { - SocketAddress address = oldAddressGroup.getAddresses().get(addressIndex); - int newIndex = newAddressGroup.getAddresses().indexOf(address); - if (newIndex != -1) { - addressIndex = newIndex; - } else { + if (!addressIndex.seekTo(previousAddress)) { // Forced to drop the connection if (state.getState() == READY) { savedTransport = activeTransport; activeTransport = null; - addressIndex = 0; + addressIndex.reset(); gotoNonErrorState(IDLE); } else { savedTransport = pendingTransport; pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); startNewTransport(); } } @@ -387,7 +389,7 @@ final class InternalSubchannel implements Instrumented { savedPendingTransport = pendingTransport; activeTransport = null; pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); if (transports.isEmpty()) { handleTermination(); if (log.isLoggable(Level.FINE)) { @@ -409,15 +411,15 @@ final class InternalSubchannel implements Instrumented { @Override public String toString() { - // addressGroupCopy being a little stale is fine, just avoid calling toString with the lock + // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock // since there may be many addresses. - Object addressGroupCopy; + Object addressGroupsCopy; synchronized (lock) { - addressGroupCopy = addressGroup; + addressGroupsCopy = addressIndex.getGroups(); } return MoreObjects.toStringHelper(this) .add("logId", logId.getId()) - .add("addressGroup", addressGroupCopy) + .add("addressGroups", addressGroupsCopy) .toString(); } @@ -456,10 +458,10 @@ final class InternalSubchannel implements Instrumented { } } - EquivalentAddressGroup getAddressGroup() { + List getAddressGroups() { try { synchronized (lock) { - return addressGroup; + return addressIndex.getGroups(); } } finally { channelExecutor.drain(); @@ -487,14 +489,14 @@ final class InternalSubchannel implements Instrumented { SettableFuture ret = SettableFuture.create(); ChannelStats.Builder builder = new ChannelStats.Builder(); - EquivalentAddressGroup addressGroupSnapshot; + List addressGroupsSnapshot; List transportsSnapshot; synchronized (lock) { - addressGroupSnapshot = addressGroup; + addressGroupsSnapshot = addressIndex.getGroups(); transportsSnapshot = new ArrayList(transports); } - builder.setTarget(addressGroupSnapshot.toString()).setState(getState()); + builder.setTarget(addressGroupsSnapshot.toString()).setState(getState()); builder.setSockets(transportsSnapshot); callsTracer.updateBuilder(builder); if (channelTracer != null) { @@ -515,6 +517,12 @@ final class InternalSubchannel implements Instrumented { } } + private static void checkListHasNoNulls(List list, String msg) { + for (Object item : list) { + Preconditions.checkNotNull(item, msg); + } + } + /** Listener for real transports. */ private class TransportListener implements ManagedClientTransport.Listener { final ConnectionClientTransport transport; @@ -573,15 +581,15 @@ final class InternalSubchannel implements Instrumented { if (activeTransport == transport) { gotoNonErrorState(IDLE); activeTransport = null; - addressIndex = 0; + addressIndex.reset(); } else if (pendingTransport == transport) { Preconditions.checkState(state.getState() == CONNECTING, "Expected state is CONNECTING, actual state is %s", state.getState()); - addressIndex++; + addressIndex.increment(); // Continue reconnect if there are still addresses to try. - if (addressIndex >= addressGroup.getAddresses().size()) { + if (!addressIndex.isValid()) { pendingTransport = null; - addressIndex = 0; + addressIndex.reset(); // Initiate backoff // Transition to TRANSIENT_FAILURE scheduleBackoff(s); @@ -703,4 +711,68 @@ final class InternalSubchannel implements Instrumented { }; } } + + /** Index as in 'i', the pointer to an entry. Not a "search index." */ + @VisibleForTesting + static final class Index { + private List addressGroups; + private int groupIndex; + private int addressIndex; + + public Index(List groups) { + this.addressGroups = groups; + } + + public boolean isValid() { + // addressIndex will never be invalid + return groupIndex < addressGroups.size(); + } + + public boolean isAtBeginning() { + return groupIndex == 0 && addressIndex == 0; + } + + public void increment() { + EquivalentAddressGroup group = addressGroups.get(groupIndex); + addressIndex++; + if (addressIndex >= group.getAddresses().size()) { + groupIndex++; + addressIndex = 0; + } + } + + public void reset() { + groupIndex = 0; + addressIndex = 0; + } + + public SocketAddress getCurrentAddress() { + return addressGroups.get(groupIndex).getAddresses().get(addressIndex); + } + + public List getGroups() { + return addressGroups; + } + + /** Update to new groups, resetting the current index. */ + public void updateGroups(List newGroups) { + addressGroups = newGroups; + reset(); + } + + /** Returns false if the needle was not found and the current index was left unchanged. */ + public boolean seekTo(SocketAddress needle) { + for (int i = 0; i < addressGroups.size(); i++) { + EquivalentAddressGroup group = addressGroups.get(i); + int j = group.getAddresses().indexOf(needle); + if (j == -1) { + continue; + } + this.groupIndex = i; + this.addressIndex = j; + return true; + } + return false; + } + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9a9ac32ffe..7b6fc9e015 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -61,6 +61,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1007,8 +1008,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented addressGroups, Attributes attrs) { + checkNotNull(addressGroups, "addressGroups"); checkNotNull(attrs, "attrs"); // TODO(ejona): can we be even stricter? Like loadBalancer == null? checkState(!terminated, "Channel is terminated"); @@ -1019,7 +1020,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented addrs) { checkArgument(subchannel instanceof SubchannelImpl, "subchannel must have been returned from createSubchannel"); ((SubchannelImpl) subchannel).subchannel.updateAddresses(addrs); @@ -1154,7 +1155,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented getAllAddresses() { + return subchannel.getAddressGroups(); } @Override diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index 1b1829a1cc..95368c0e73 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -43,6 +43,7 @@ import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.ChannelTrace; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -159,8 +160,8 @@ final class OobChannel extends ManagedChannel implements Instrumented getAllAddresses() { + return subchannel.getAddressGroups(); } @Override @@ -181,7 +182,7 @@ final class OobChannel extends ManagedChannel implements Instrumented addrsIn, Attributes attrsIn) { + assertThat(addrsIn).hasSize(1); + assertThat(addrsIn.get(0)).isSameAs(eag); + assertThat(attrsIn).isSameAs(attrs); + ran = true; + return subchannel; + } + } + + OverrideCreateSubchannel helper = new OverrideCreateSubchannel(); + assertThat(helper.createSubchannel(eag, attrs)).isSameAs(subchannel); + assertThat(helper.ran).isTrue(); + } + + @Test(expected = UnsupportedOperationException.class) + public void helper_createSubchannelList_throws() { + new NoopHelper().createSubchannel(Arrays.asList(eag), attrs); + } + + @Test + public void helper_updateSubchannelAddresses_delegates() { + class OverrideUpdateSubchannel extends NoopHelper { + boolean ran; + + @Override + public void updateSubchannelAddresses( + Subchannel subchannelIn, List addrsIn) { + assertThat(subchannelIn).isSameAs(emptySubchannel); + assertThat(addrsIn).hasSize(1); + assertThat(addrsIn.get(0)).isSameAs(eag); + ran = true; + } + } + + OverrideUpdateSubchannel helper = new OverrideUpdateSubchannel(); + helper.updateSubchannelAddresses(emptySubchannel, eag); + assertThat(helper.ran).isTrue(); + } + + @Test(expected = UnsupportedOperationException.class) + public void helper_updateSubchannelAddressesList_throws() { + new NoopHelper().updateSubchannelAddresses(null, Arrays.asList(eag)); + } + + @Test + public void subchannel_getAddresses_delegates() { + class OverrideGetAllAddresses extends EmptySubchannel { + boolean ran; + + @Override public List getAllAddresses() { + ran = true; + return Arrays.asList(eag); + } + } + + OverrideGetAllAddresses subchannel = new OverrideGetAllAddresses(); + assertThat(subchannel.getAddresses()).isEqualTo(eag); + assertThat(subchannel.ran).isTrue(); + } + + @Test(expected = IllegalStateException.class) + public void subchannel_getAddresses_throwsOnTwoAddrs() { + new EmptySubchannel() { + boolean ran; + + @Override public List getAllAddresses() { + ran = true; + // Doubling up eag is technically a bad idea, but nothing here cares + return Arrays.asList(eag, eag); + } + }.getAddresses(); + } + + private static class NoopHelper extends LoadBalancer.Helper { + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + return null; + } + + @Override + public void updateBalancingState( + ConnectivityState newState, LoadBalancer.SubchannelPicker newPicker) {} + + @Override public void runSerialized(Runnable task) {} + + @Override public NameResolver.Factory getNameResolverFactory() { + return null; + } + + @Override public String getAuthority() { + return null; + } + } + + private static class EmptySubchannel extends LoadBalancer.Subchannel { + @Override public void shutdown() {} + + @Override public void requestConnection() {} + + @Override public Attributes getAttributes() { + return null; + } + } } diff --git a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java index 8e8865912c..0f9cd106c3 100644 --- a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java @@ -22,6 +22,7 @@ import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.inOrder; @@ -63,8 +64,6 @@ public class PickFirstLoadBalancerTest { private static final Attributes.Key FOO = Attributes.Key.create("foo"); private Attributes affinity = Attributes.newBuilder().set(FOO, "bar").build(); - @Captor - private ArgumentCaptor eagCaptor; @Captor private ArgumentCaptor pickerCaptor; @Captor @@ -86,7 +85,8 @@ public class PickFirstLoadBalancerTest { } when(mockSubchannel.getAddresses()).thenThrow(new UnsupportedOperationException()); - when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class))) + when(mockHelper.createSubchannel( + anyListOf(EquivalentAddressGroup.class), any(Attributes.class))) .thenReturn(mockSubchannel); loadBalancer = (PickFirstBalancer) PickFirstBalancerFactory.getInstance().newLoadBalancer( @@ -102,11 +102,10 @@ public class PickFirstLoadBalancerTest { public void pickAfterResolved() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); - verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture()); + verify(mockHelper).createSubchannel(eq(servers), attrsCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue()); assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), pickerCaptor.getValue().pickSubchannel(mockArgs)); @@ -120,12 +119,12 @@ public class PickFirstLoadBalancerTest { loadBalancer.handleResolvedAddressGroups(servers, affinity); verifyNoMoreInteractions(mockSubchannel); - verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), + verify(mockHelper).createSubchannel(anyListOf(EquivalentAddressGroup.class), any(Attributes.class)); verify(mockHelper).updateBalancingState(isA(ConnectivityState.class), isA(Picker.class)); // Updating the subchannel addresses is unnecessary, but doesn't hurt anything verify(mockHelper).updateSubchannelAddresses( - eq(mockSubchannel), any(EquivalentAddressGroup.class)); + eq(mockSubchannel), anyListOf(EquivalentAddressGroup.class)); verifyNoMoreInteractions(mockHelper); } @@ -140,15 +139,13 @@ public class PickFirstLoadBalancerTest { InOrder inOrder = inOrder(mockHelper); loadBalancer.handleResolvedAddressGroups(servers, affinity); - inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class)); + inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); - assertEquals(socketAddresses, eagCaptor.getValue().getAddresses()); assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); loadBalancer.handleResolvedAddressGroups(newServers, affinity); - inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eagCaptor.capture()); - assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses()); + inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers)); verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockHelper); @@ -209,8 +206,7 @@ public class PickFirstLoadBalancerTest { verify(mockSubchannel, never()).requestConnection(); loadBalancer.handleResolvedAddressGroups(servers, affinity); - inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)), - eq(Attributes.EMPTY)); + inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockSubchannel).requestConnection(); diff --git a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java index 082aa3b4b9..1542a6facd 100644 --- a/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java +++ b/core/src/test/java/io/grpc/internal/AutoConfiguredLoadBalancerFactoryTest.java @@ -113,8 +113,8 @@ public class AutoConfiguredLoadBalancerFactoryTest { new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY)); Helper helper = new TestHelper() { @Override - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { - assertThat(addrs).isEqualTo(servers.get(0)); + public Subchannel createSubchannel(List addrs, Attributes attrs) { + assertThat(addrs).isEqualTo(servers); return new TestSubchannel(addrs, attrs); } @@ -147,8 +147,8 @@ public class AutoConfiguredLoadBalancerFactoryTest { Attributes.EMPTY)); Helper helper = new TestHelper() { @Override - public Subchannel createSubchannel(final EquivalentAddressGroup addrs, Attributes attrs) { - assertThat(addrs).isEqualTo(servers.get(0)); + public Subchannel createSubchannel(List addrs, Attributes attrs) { + assertThat(addrs).isEqualTo(servers); return new TestSubchannel(addrs, attrs); } @@ -304,7 +304,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { } @Override - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + public Subchannel createSubchannel(List addrs, Attributes attrs) { return delegate().createSubchannel(addrs, attrs); } @@ -348,12 +348,12 @@ public class AutoConfiguredLoadBalancerFactoryTest { } private static class TestSubchannel extends Subchannel { - TestSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + TestSubchannel(List addrs, Attributes attrs) { this.addrs = addrs; this.attrs = attrs; } - final EquivalentAddressGroup addrs; + final List addrs; final Attributes attrs; @Override @@ -365,7 +365,7 @@ public class AutoConfiguredLoadBalancerFactoryTest { } @Override - public EquivalentAddressGroup getAddresses() { + public List getAllAddresses() { return addrs; } diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 4a35783e86..d67691ec2d 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -42,10 +42,12 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.Status; import io.grpc.internal.InternalSubchannel.CallTracingTransport; +import io.grpc.internal.InternalSubchannel.Index; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -114,7 +116,6 @@ public class InternalSubchannelTest { }; private InternalSubchannel internalSubchannel; - private EquivalentAddressGroup addressGroup; private BlockingQueue transports; @Before public void setUp() { @@ -133,6 +134,16 @@ public class InternalSubchannelTest { assertEquals(0, fakeExecutor.numPendingTasks()); } + @Test(expected = IllegalArgumentException.class) + public void constructor_emptyEagList_throws() { + createInternalSubchannel(new EquivalentAddressGroup[0]); + } + + @Test(expected = NullPointerException.class) + public void constructor_eagListWithNull_throws() { + createInternalSubchannel(new EquivalentAddressGroup[] {null}); + } + @Test public void singleAddressReconnect() { SocketAddress addr = mock(SocketAddress.class); createInternalSubchannel(addr); @@ -379,6 +390,20 @@ public class InternalSubchannelTest { verify(mockBackoffPolicy3, times(backoff3Consulted)).nextBackoffNanos(); } + @Test(expected = IllegalArgumentException.class) + public void updateAddresses_emptyEagList_throws() { + SocketAddress addr = new FakeSocketAddress(); + createInternalSubchannel(addr); + internalSubchannel.updateAddresses(Arrays.asList()); + } + + @Test(expected = NullPointerException.class) + public void updateAddresses_eagListWithNull_throws() { + SocketAddress addr = new FakeSocketAddress(); + createInternalSubchannel(addr); + internalSubchannel.updateAddresses(Arrays.asList((EquivalentAddressGroup) null)); + } + @Test public void updateAddresses_intersecting_ready() { SocketAddress addr1 = mock(SocketAddress.class); SocketAddress addr2 = mock(SocketAddress.class); @@ -400,7 +425,8 @@ public class InternalSubchannelTest { assertEquals(READY, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); assertNoCallbackInvoke(); assertEquals(READY, internalSubchannel.getState()); verify(transports.peek().transport, never()).shutdown(any(Status.class)); @@ -442,7 +468,8 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr2, addr3))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); verify(transports.peek().transport, never()).shutdown(any(Status.class)); @@ -471,7 +498,7 @@ public class InternalSubchannelTest { SocketAddress addr2 = mock(SocketAddress.class); createInternalSubchannel(addr1); - internalSubchannel.updateAddresses(new EquivalentAddressGroup(addr2)); + internalSubchannel.updateAddresses(Arrays.asList(new EquivalentAddressGroup(addr2))); // Nothing happened on address update verify(mockTransportFactory, never()) @@ -519,7 +546,8 @@ public class InternalSubchannelTest { assertEquals(READY, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); assertExactCallbackInvokes("onStateChange:IDLE"); assertEquals(IDLE, internalSubchannel.getState()); verify(transports.peek().transport).shutdown(any(Status.class)); @@ -561,7 +589,8 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); // Update addresses - internalSubchannel.updateAddresses(new EquivalentAddressGroup(Arrays.asList(addr3, addr4))); + internalSubchannel.updateAddresses( + Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr3, addr4)))); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -946,9 +975,100 @@ public class InternalSubchannelTest { assertEquals(actualTransport.transport.getLogId(), registeredTransport.getLogId()); } + @Test public void index_looping() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + SocketAddress addr4 = new FakeSocketAddress(); + SocketAddress addr5 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), + new EquivalentAddressGroup(Arrays.asList(addr3)), + new EquivalentAddressGroup(Arrays.asList(addr4, addr5)))); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr2); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr3); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr4); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr5); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isTrue(); + + index.increment(); + assertThat(index.isAtBeginning()).isFalse(); + assertThat(index.isValid()).isFalse(); + + index.reset(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.isAtBeginning()).isTrue(); + assertThat(index.isValid()).isTrue(); + + // We want to make sure both groupIndex and addressIndex are reset + index.increment(); + index.increment(); + index.increment(); + index.increment(); + assertThat(index.getCurrentAddress()).isSameAs(addr5); + index.reset(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + } + + @Test public void index_updateGroups_resets() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + index.increment(); + index.increment(); + // We want to make sure both groupIndex and addressIndex are reset + index.updateGroups(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1)), + new EquivalentAddressGroup(Arrays.asList(addr2, addr3)))); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + } + + @Test public void index_seekTo() { + SocketAddress addr1 = new FakeSocketAddress(); + SocketAddress addr2 = new FakeSocketAddress(); + SocketAddress addr3 = new FakeSocketAddress(); + Index index = new Index(Arrays.asList( + new EquivalentAddressGroup(Arrays.asList(addr1, addr2)), + new EquivalentAddressGroup(Arrays.asList(addr3)))); + assertThat(index.seekTo(addr3)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr3); + assertThat(index.seekTo(addr1)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr1); + assertThat(index.seekTo(addr2)).isTrue(); + assertThat(index.getCurrentAddress()).isSameAs(addr2); + index.seekTo(new FakeSocketAddress()); + // Failed seekTo doesn't change the index + assertThat(index.getCurrentAddress()).isSameAs(addr2); + } + private void createInternalSubchannel(SocketAddress ... addrs) { - addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs)); - internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT, + createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addrs))); + } + + private void createInternalSubchannel(EquivalentAddressGroup ... addrs) { + List addressGroups = Arrays.asList(addrs); + internalSubchannel = new InternalSubchannel(addressGroups, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback, channelz, CallTracer.getDefaultFactory().create(), null, @@ -970,4 +1090,6 @@ public class InternalSubchannelTest { assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); callbackInvokes.clear(); } + + private static class FakeSocketAddress extends SocketAddress {} } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 2432130b31..42b09a7d1f 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -2064,7 +2064,8 @@ public class ManagedChannelImplTest { assertEquals(TARGET, getStats(channel).target); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); - assertEquals(addressGroup.toString(), getStats((AbstractSubchannel) subchannel).target); + assertEquals(Collections.singletonList(addressGroup).toString(), + getStats((AbstractSubchannel) subchannel).target); } @Test