diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 2c16b101dc..03f2754f3d 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -454,20 +454,12 @@ public abstract class LoadBalancer { @ThreadSafe public abstract static class Helper { /** - * Creates a Subchannel, which is a logical connection to the given group of addresses which are - * considered equivalent. The {@code attrs} are custom attributes associated with this - * Subchannel, and can be accessed later through {@link Subchannel#getAttributes - * Subchannel.getAttributes()}. - * - *

The LoadBalancer is responsible for closing unused Subchannels, and closing all - * Subchannels within {@link #shutdown}. - * - *

The default implementation calls {@link #createSubchannel(List, Attributes)}. - * Implementations should not override this method. + * Equivalent to {@link #createSubchannel(List, Attributes)} with the given single {@code + * EquivalentAddressGroup}. * * @since 1.2.0 */ - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { + public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { Preconditions.checkNotNull(addrs, "addrs"); return createSubchannel(Collections.singletonList(addrs), attrs); } @@ -489,18 +481,12 @@ public abstract class LoadBalancer { } /** - * Replaces the existing addresses used with {@code subchannel}. This method is superior to - * {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can - * continue using an existing connection. + * Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with + * the given single {@code EquivalentAddressGroup}. * - *

The default implementation calls {@link #updateSubchannelAddresses( - * LoadBalancer.Subchannel, List)}. Implementations should not override this method. - * - * @throws IllegalArgumentException if {@code subchannel} was not returned from {@link - * #createSubchannel} * @since 1.4.0 */ - public void updateSubchannelAddresses( + public final void updateSubchannelAddresses( Subchannel subchannel, EquivalentAddressGroup addrs) { Preconditions.checkNotNull(addrs, "addrs"); updateSubchannelAddresses(subchannel, Collections.singletonList(addrs)); @@ -622,17 +608,15 @@ public abstract class LoadBalancer { public abstract void requestConnection(); /** - * Returns the addresses that this Subchannel is bound to. The default implementation calls - * getAllAddresses(). - * - *

The default implementation calls {@link #getAllAddresses()}. Implementations should not - * override this method. + * Returns the addresses that this Subchannel is bound to. This can be called only if + * the Subchannel has only one {@link EquivalentAddressGroup}. Under the hood it calls + * {@link #getAllAddresses}. * * @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup. - * Use getAllAddresses() instead + * Use {@link #getAllAddresses} instead * @since 1.2.0 */ - public EquivalentAddressGroup getAddresses() { + public final EquivalentAddressGroup getAddresses() { List groups = getAllAddresses(); Preconditions.checkState(groups.size() == 1, "Does not have exactly one group"); return groups.get(0); diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java index e2a9720d47..1d26fb351e 100644 --- a/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancerHelper.java @@ -35,22 +35,11 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper { */ protected abstract LoadBalancer.Helper delegate(); - @Override - public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { - return delegate().createSubchannel(addrs, attrs); - } - @Override public Subchannel createSubchannel(List addrs, Attributes attrs) { return delegate().createSubchannel(addrs, attrs); } - @Override - public void updateSubchannelAddresses( - Subchannel subchannel, EquivalentAddressGroup addrs) { - delegate().updateSubchannelAddresses(subchannel, addrs); - } - @Override public void updateSubchannelAddresses( Subchannel subchannel, List addrs) { diff --git a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java index 30c31e2544..363b0e824b 100644 --- a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java @@ -84,7 +84,7 @@ public class PickFirstLoadBalancerTest { socketAddresses.add(addr); } - when(mockSubchannel.getAddresses()).thenThrow(new UnsupportedOperationException()); + when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException()); when(mockHelper.createSubchannel( anyListOf(EquivalentAddressGroup.class), any(Attributes.class))) .thenReturn(mockSubchannel); diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index fdca3b3392..a6302e21fd 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -91,7 +91,7 @@ import org.mockito.stubbing.Answer; public class RoundRobinLoadBalancerTest { private RoundRobinLoadBalancer loadBalancer; private List servers = Lists.newArrayList(); - private Map subchannels = Maps.newLinkedHashMap(); + private Map, Subchannel> subchannels = Maps.newLinkedHashMap(); private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build(); @@ -100,7 +100,7 @@ public class RoundRobinLoadBalancerTest { @Captor private ArgumentCaptor stateCaptor; @Captor - private ArgumentCaptor eagCaptor; + private ArgumentCaptor> eagListCaptor; @Mock private Helper mockHelper; @@ -108,6 +108,7 @@ public class RoundRobinLoadBalancerTest { private PickSubchannelArgs mockArgs; @Before + @SuppressWarnings("unchecked") public void setUp() { MockitoAnnotations.initMocks(this); @@ -116,11 +117,11 @@ public class RoundRobinLoadBalancerTest { EquivalentAddressGroup eag = new EquivalentAddressGroup(addr); servers.add(eag); Subchannel sc = mock(Subchannel.class); - when(sc.getAddresses()).thenReturn(eag); - subchannels.put(eag, sc); + when(sc.getAllAddresses()).thenReturn(Arrays.asList(eag)); + subchannels.put(Arrays.asList(eag), sc); } - when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class))) + when(mockHelper.createSubchannel(any(List.class), any(Attributes.class))) .then(new Answer() { @Override public Subchannel answer(InvocationOnMock invocation) throws Throwable { @@ -146,10 +147,10 @@ public class RoundRobinLoadBalancerTest { loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); - verify(mockHelper, times(3)).createSubchannel(eagCaptor.capture(), + verify(mockHelper, times(3)).createSubchannel(eagListCaptor.capture(), any(Attributes.class)); - assertThat(eagCaptor.getAllValues()).containsAllIn(subchannels.keySet()); + assertThat(eagListCaptor.getAllValues()).containsAllIn(subchannels.keySet()); for (Subchannel subchannel : subchannels.values()) { verify(subchannel).requestConnection(); verify(subchannel, never()).shutdown(); @@ -165,26 +166,34 @@ public class RoundRobinLoadBalancerTest { verifyNoMoreInteractions(mockHelper); } + @SuppressWarnings("unchecked") @Test public void pickAfterResolvedUpdatedHosts() throws Exception { Subchannel removedSubchannel = mock(Subchannel.class); Subchannel oldSubchannel = mock(Subchannel.class); Subchannel newSubchannel = mock(Subchannel.class); - for (Subchannel subchannel : Lists.newArrayList(removedSubchannel, oldSubchannel, - newSubchannel)) { - when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO, - new Ref( - ConnectivityStateInfo.forNonError(READY))).build()); - } - FakeSocketAddress removedAddr = new FakeSocketAddress("removed"); FakeSocketAddress oldAddr = new FakeSocketAddress("old"); FakeSocketAddress newAddr = new FakeSocketAddress("new"); - final Map subchannels2 = Maps.newHashMap(); - subchannels2.put(new EquivalentAddressGroup(removedAddr), removedSubchannel); - subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel); + List allSubchannels = + Lists.newArrayList(removedSubchannel, oldSubchannel, newSubchannel); + List allAddrs = + Lists.newArrayList(removedAddr, oldAddr, newAddr); + for (int i = 0; i < allSubchannels.size(); i++) { + Subchannel subchannel = allSubchannels.get(i); + List eagList = + Arrays.asList(new EquivalentAddressGroup(allAddrs.get(i))); + when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO, + new Ref( + ConnectivityStateInfo.forNonError(READY))).build()); + when(subchannel.getAllAddresses()).thenReturn(eagList); + } + + final Map, Subchannel> subchannels2 = Maps.newHashMap(); + subchannels2.put(Arrays.asList(new EquivalentAddressGroup(removedAddr)), removedSubchannel); + subchannels2.put(Arrays.asList(new EquivalentAddressGroup(oldAddr)), oldSubchannel); List currentServers = Lists.newArrayList( @@ -197,7 +206,7 @@ public class RoundRobinLoadBalancerTest { Object[] args = invocation.getArguments(); return subchannels2.get(args[0]); } - }).when(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); + }).when(mockHelper).createSubchannel(any(List.class), any(Attributes.class)); loadBalancer.handleResolvedAddressGroups(currentServers, affinity); @@ -214,8 +223,8 @@ public class RoundRobinLoadBalancerTest { oldSubchannel); subchannels2.clear(); - subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel); - subchannels2.put(new EquivalentAddressGroup(newAddr), newSubchannel); + subchannels2.put(Arrays.asList(new EquivalentAddressGroup(oldAddr)), oldSubchannel); + subchannels2.put(Arrays.asList(new EquivalentAddressGroup(newAddr)), newSubchannel); List latestServers = Lists.newArrayList( @@ -233,8 +242,7 @@ public class RoundRobinLoadBalancerTest { assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel, newSubchannel); - verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class), - any(Attributes.class)); + verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); @@ -250,6 +258,7 @@ public class RoundRobinLoadBalancerTest { verifyNoMoreInteractions(mockHelper); } + @SuppressWarnings("unchecked") @Test public void pickAfterStateChange() throws Exception { InOrder inOrder = inOrder(mockHelper); @@ -282,8 +291,7 @@ public class RoundRobinLoadBalancerTest { ConnectivityStateInfo.forNonError(IDLE)); verify(subchannel, times(2)).requestConnection(); - verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class), - any(Attributes.class)); + verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class)); verifyNoMoreInteractions(mockHelper); } @@ -329,6 +337,7 @@ public class RoundRobinLoadBalancerTest { verifyNoMoreInteractions(mockHelper); } + @SuppressWarnings("unchecked") @Test public void nameResolutionErrorWithActiveChannels() throws Exception { final Subchannel readySubchannel = subchannels.values().iterator().next(); @@ -336,8 +345,7 @@ public class RoundRobinLoadBalancerTest { loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); - verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class), - any(Attributes.class)); + verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class)); verify(mockHelper, times(3)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); diff --git a/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java b/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java index 9c5151f391..c41a0220c0 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/CachedSubchannelPoolTest.java @@ -19,6 +19,7 @@ package io.grpc.grpclb; import static com.google.common.truth.Truth.assertThat; import static io.grpc.grpclb.CachedSubchannelPool.SHUTDOWN_TIMEOUT_MS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.doAnswer; @@ -39,6 +40,8 @@ import io.grpc.grpclb.CachedSubchannelPool.ShutdownSubchannelTask; import io.grpc.internal.FakeClock; import io.grpc.internal.SerializingExecutor; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -73,19 +76,21 @@ public class CachedSubchannelPoolTest { private final ArrayList mockSubchannels = new ArrayList<>(); @Before + @SuppressWarnings("unchecked") public void setUp() { doAnswer(new Answer() { @Override public Subchannel answer(InvocationOnMock invocation) throws Throwable { Subchannel subchannel = mock(Subchannel.class); - EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; + List eagList = + (List) invocation.getArguments()[0]; Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAddresses()).thenReturn(eag); + when(subchannel.getAllAddresses()).thenReturn(eagList); when(subchannel.getAttributes()).thenReturn(attrs); mockSubchannels.add(subchannel); return subchannel; } - }).when(helper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); + }).when(helper).createSubchannel(any(List.class), any(Attributes.class)); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -109,12 +114,12 @@ public class CachedSubchannelPoolTest { public void subchannelExpireAfterReturned() { Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); assertThat(subchannel1).isNotNull(); - verify(helper).createSubchannel(same(EAG1), same(ATTRS1)); + verify(helper).createSubchannel(eq(Arrays.asList(EAG1)), same(ATTRS1)); Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); assertThat(subchannel2).isNotNull(); assertThat(subchannel2).isNotSameAs(subchannel1); - verify(helper).createSubchannel(same(EAG2), same(ATTRS2)); + verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); pool.returnSubchannel(subchannel1); @@ -140,12 +145,12 @@ public class CachedSubchannelPoolTest { public void subchannelReused() { Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1); assertThat(subchannel1).isNotNull(); - verify(helper).createSubchannel(same(EAG1), same(ATTRS1)); + verify(helper).createSubchannel(eq(Arrays.asList(EAG1)), same(ATTRS1)); Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2); assertThat(subchannel2).isNotNull(); assertThat(subchannel2).isNotSameAs(subchannel1); - verify(helper).createSubchannel(same(EAG2), same(ATTRS2)); + verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); pool.returnSubchannel(subchannel1); @@ -167,7 +172,7 @@ public class CachedSubchannelPoolTest { // pool will create a new channel for EAG2 when requested Subchannel subchannel2a = pool.takeOrCreateSubchannel(EAG2, ATTRS2); assertThat(subchannel2a).isNotSameAs(subchannel2); - verify(helper, times(2)).createSubchannel(same(EAG2), same(ATTRS2)); + verify(helper, times(2)).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2)); // subchannel1 expires SHUTDOWN_TIMEOUT_MS after being returned pool.returnSubchannel(subchannel1a); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e540528108..177a8324c2 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -222,7 +222,7 @@ public class GrpclbLoadBalancerTest { Subchannel subchannel = mock(Subchannel.class); EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0]; Attributes attrs = (Attributes) invocation.getArguments()[1]; - when(subchannel.getAddresses()).thenReturn(eag); + when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag)); when(subchannel.getAttributes()).thenReturn(attrs); mockSubchannels.add(subchannel); subchannelTracker.add(subchannel); @@ -262,6 +262,7 @@ public class GrpclbLoadBalancerTest { } @After + @SuppressWarnings("unchecked") public void tearDown() { try { if (balancer != null) { @@ -285,7 +286,7 @@ public class GrpclbLoadBalancerTest { verify(subchannel, never()).shutdown(); } verify(helper, never()) - .createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); + .createSubchannel(any(List.class), any(Attributes.class)); // No timer should linger after shutdown assertThat(fakeClock.getPendingTasks()).isEmpty(); } finally {