mirror of https://github.com/grpc/grpc-java.git
core: finalize convenient overrides on LoadBalancer.Helper and Subchannel. (#4954)
Those overrides are kept for backward compatibility and convenience for callers. Documentation already says implementations should not override them. Making them final reduces confusion around which override should be verified in tests and be overridden in forwarding classes, thus prevents bugs caused by such confusion.
This commit is contained in:
parent
60b02c0b9c
commit
6adc1797c9
|
|
@ -454,20 +454,12 @@ public abstract class LoadBalancer {
|
|||
@ThreadSafe
|
||||
public abstract static class Helper {
|
||||
/**
|
||||
* Creates a Subchannel, which is a logical connection to the given group of addresses which are
|
||||
* considered equivalent. The {@code attrs} are custom attributes associated with this
|
||||
* Subchannel, and can be accessed later through {@link Subchannel#getAttributes
|
||||
* Subchannel.getAttributes()}.
|
||||
*
|
||||
* <p>The LoadBalancer is responsible for closing unused Subchannels, and closing all
|
||||
* Subchannels within {@link #shutdown}.
|
||||
*
|
||||
* <p>The default implementation calls {@link #createSubchannel(List, Attributes)}.
|
||||
* Implementations should not override this method.
|
||||
* Equivalent to {@link #createSubchannel(List, Attributes)} with the given single {@code
|
||||
* EquivalentAddressGroup}.
|
||||
*
|
||||
* @since 1.2.0
|
||||
*/
|
||||
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
|
||||
public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
|
||||
Preconditions.checkNotNull(addrs, "addrs");
|
||||
return createSubchannel(Collections.singletonList(addrs), attrs);
|
||||
}
|
||||
|
|
@ -489,18 +481,12 @@ public abstract class LoadBalancer {
|
|||
}
|
||||
|
||||
/**
|
||||
* Replaces the existing addresses used with {@code subchannel}. This method is superior to
|
||||
* {@link #createSubchannel} when the new and old addresses overlap, since the subchannel can
|
||||
* continue using an existing connection.
|
||||
* Equivalent to {@link #updateSubchannelAddresses(io.grpc.LoadBalancer.Subchannel, List)} with
|
||||
* the given single {@code EquivalentAddressGroup}.
|
||||
*
|
||||
* <p>The default implementation calls {@link #updateSubchannelAddresses(
|
||||
* LoadBalancer.Subchannel, List)}. Implementations should not override this method.
|
||||
*
|
||||
* @throws IllegalArgumentException if {@code subchannel} was not returned from {@link
|
||||
* #createSubchannel}
|
||||
* @since 1.4.0
|
||||
*/
|
||||
public void updateSubchannelAddresses(
|
||||
public final void updateSubchannelAddresses(
|
||||
Subchannel subchannel, EquivalentAddressGroup addrs) {
|
||||
Preconditions.checkNotNull(addrs, "addrs");
|
||||
updateSubchannelAddresses(subchannel, Collections.singletonList(addrs));
|
||||
|
|
@ -622,17 +608,15 @@ public abstract class LoadBalancer {
|
|||
public abstract void requestConnection();
|
||||
|
||||
/**
|
||||
* Returns the addresses that this Subchannel is bound to. The default implementation calls
|
||||
* getAllAddresses().
|
||||
*
|
||||
* <p>The default implementation calls {@link #getAllAddresses()}. Implementations should not
|
||||
* override this method.
|
||||
* Returns the addresses that this Subchannel is bound to. This can be called only if
|
||||
* the Subchannel has only one {@link EquivalentAddressGroup}. Under the hood it calls
|
||||
* {@link #getAllAddresses}.
|
||||
*
|
||||
* @throws IllegalStateException if this subchannel has more than one EquivalentAddressGroup.
|
||||
* Use getAllAddresses() instead
|
||||
* Use {@link #getAllAddresses} instead
|
||||
* @since 1.2.0
|
||||
*/
|
||||
public EquivalentAddressGroup getAddresses() {
|
||||
public final EquivalentAddressGroup getAddresses() {
|
||||
List<EquivalentAddressGroup> groups = getAllAddresses();
|
||||
Preconditions.checkState(groups.size() == 1, "Does not have exactly one group");
|
||||
return groups.get(0);
|
||||
|
|
|
|||
|
|
@ -35,22 +35,11 @@ public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper {
|
|||
*/
|
||||
protected abstract LoadBalancer.Helper delegate();
|
||||
|
||||
@Override
|
||||
public Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
|
||||
return delegate().createSubchannel(addrs, attrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
|
||||
return delegate().createSubchannel(addrs, attrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateSubchannelAddresses(
|
||||
Subchannel subchannel, EquivalentAddressGroup addrs) {
|
||||
delegate().updateSubchannelAddresses(subchannel, addrs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateSubchannelAddresses(
|
||||
Subchannel subchannel, List<EquivalentAddressGroup> addrs) {
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ public class PickFirstLoadBalancerTest {
|
|||
socketAddresses.add(addr);
|
||||
}
|
||||
|
||||
when(mockSubchannel.getAddresses()).thenThrow(new UnsupportedOperationException());
|
||||
when(mockSubchannel.getAllAddresses()).thenThrow(new UnsupportedOperationException());
|
||||
when(mockHelper.createSubchannel(
|
||||
anyListOf(EquivalentAddressGroup.class), any(Attributes.class)))
|
||||
.thenReturn(mockSubchannel);
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ import org.mockito.stubbing.Answer;
|
|||
public class RoundRobinLoadBalancerTest {
|
||||
private RoundRobinLoadBalancer loadBalancer;
|
||||
private List<EquivalentAddressGroup> servers = Lists.newArrayList();
|
||||
private Map<EquivalentAddressGroup, Subchannel> subchannels = Maps.newLinkedHashMap();
|
||||
private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Maps.newLinkedHashMap();
|
||||
private static final Attributes.Key<String> MAJOR_KEY = Attributes.Key.create("major-key");
|
||||
private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build();
|
||||
|
||||
|
|
@ -100,7 +100,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
@Captor
|
||||
private ArgumentCaptor<ConnectivityState> stateCaptor;
|
||||
@Captor
|
||||
private ArgumentCaptor<EquivalentAddressGroup> eagCaptor;
|
||||
private ArgumentCaptor<List<EquivalentAddressGroup>> eagListCaptor;
|
||||
@Mock
|
||||
private Helper mockHelper;
|
||||
|
||||
|
|
@ -108,6 +108,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
private PickSubchannelArgs mockArgs;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setUp() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
|
|
@ -116,11 +117,11 @@ public class RoundRobinLoadBalancerTest {
|
|||
EquivalentAddressGroup eag = new EquivalentAddressGroup(addr);
|
||||
servers.add(eag);
|
||||
Subchannel sc = mock(Subchannel.class);
|
||||
when(sc.getAddresses()).thenReturn(eag);
|
||||
subchannels.put(eag, sc);
|
||||
when(sc.getAllAddresses()).thenReturn(Arrays.asList(eag));
|
||||
subchannels.put(Arrays.asList(eag), sc);
|
||||
}
|
||||
|
||||
when(mockHelper.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)))
|
||||
when(mockHelper.createSubchannel(any(List.class), any(Attributes.class)))
|
||||
.then(new Answer<Subchannel>() {
|
||||
@Override
|
||||
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
|
||||
|
|
@ -146,10 +147,10 @@ public class RoundRobinLoadBalancerTest {
|
|||
loadBalancer.handleResolvedAddressGroups(servers, affinity);
|
||||
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
|
||||
|
||||
verify(mockHelper, times(3)).createSubchannel(eagCaptor.capture(),
|
||||
verify(mockHelper, times(3)).createSubchannel(eagListCaptor.capture(),
|
||||
any(Attributes.class));
|
||||
|
||||
assertThat(eagCaptor.getAllValues()).containsAllIn(subchannels.keySet());
|
||||
assertThat(eagListCaptor.getAllValues()).containsAllIn(subchannels.keySet());
|
||||
for (Subchannel subchannel : subchannels.values()) {
|
||||
verify(subchannel).requestConnection();
|
||||
verify(subchannel, never()).shutdown();
|
||||
|
|
@ -165,26 +166,34 @@ public class RoundRobinLoadBalancerTest {
|
|||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void pickAfterResolvedUpdatedHosts() throws Exception {
|
||||
Subchannel removedSubchannel = mock(Subchannel.class);
|
||||
Subchannel oldSubchannel = mock(Subchannel.class);
|
||||
Subchannel newSubchannel = mock(Subchannel.class);
|
||||
|
||||
for (Subchannel subchannel : Lists.newArrayList(removedSubchannel, oldSubchannel,
|
||||
newSubchannel)) {
|
||||
when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO,
|
||||
new Ref<ConnectivityStateInfo>(
|
||||
ConnectivityStateInfo.forNonError(READY))).build());
|
||||
}
|
||||
|
||||
FakeSocketAddress removedAddr = new FakeSocketAddress("removed");
|
||||
FakeSocketAddress oldAddr = new FakeSocketAddress("old");
|
||||
FakeSocketAddress newAddr = new FakeSocketAddress("new");
|
||||
|
||||
final Map<EquivalentAddressGroup, Subchannel> subchannels2 = Maps.newHashMap();
|
||||
subchannels2.put(new EquivalentAddressGroup(removedAddr), removedSubchannel);
|
||||
subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
|
||||
List<Subchannel> allSubchannels =
|
||||
Lists.newArrayList(removedSubchannel, oldSubchannel, newSubchannel);
|
||||
List<FakeSocketAddress> allAddrs =
|
||||
Lists.newArrayList(removedAddr, oldAddr, newAddr);
|
||||
for (int i = 0; i < allSubchannels.size(); i++) {
|
||||
Subchannel subchannel = allSubchannels.get(i);
|
||||
List<EquivalentAddressGroup> eagList =
|
||||
Arrays.asList(new EquivalentAddressGroup(allAddrs.get(i)));
|
||||
when(subchannel.getAttributes()).thenReturn(Attributes.newBuilder().set(STATE_INFO,
|
||||
new Ref<ConnectivityStateInfo>(
|
||||
ConnectivityStateInfo.forNonError(READY))).build());
|
||||
when(subchannel.getAllAddresses()).thenReturn(eagList);
|
||||
}
|
||||
|
||||
final Map<List<EquivalentAddressGroup>, Subchannel> subchannels2 = Maps.newHashMap();
|
||||
subchannels2.put(Arrays.asList(new EquivalentAddressGroup(removedAddr)), removedSubchannel);
|
||||
subchannels2.put(Arrays.asList(new EquivalentAddressGroup(oldAddr)), oldSubchannel);
|
||||
|
||||
List<EquivalentAddressGroup> currentServers =
|
||||
Lists.newArrayList(
|
||||
|
|
@ -197,7 +206,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
Object[] args = invocation.getArguments();
|
||||
return subchannels2.get(args[0]);
|
||||
}
|
||||
}).when(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
|
||||
}).when(mockHelper).createSubchannel(any(List.class), any(Attributes.class));
|
||||
|
||||
loadBalancer.handleResolvedAddressGroups(currentServers, affinity);
|
||||
|
||||
|
|
@ -214,8 +223,8 @@ public class RoundRobinLoadBalancerTest {
|
|||
oldSubchannel);
|
||||
|
||||
subchannels2.clear();
|
||||
subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
|
||||
subchannels2.put(new EquivalentAddressGroup(newAddr), newSubchannel);
|
||||
subchannels2.put(Arrays.asList(new EquivalentAddressGroup(oldAddr)), oldSubchannel);
|
||||
subchannels2.put(Arrays.asList(new EquivalentAddressGroup(newAddr)), newSubchannel);
|
||||
|
||||
List<EquivalentAddressGroup> latestServers =
|
||||
Lists.newArrayList(
|
||||
|
|
@ -233,8 +242,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel,
|
||||
newSubchannel);
|
||||
|
||||
verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
|
||||
any(Attributes.class));
|
||||
verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class));
|
||||
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
|
||||
|
||||
picker = pickerCaptor.getValue();
|
||||
|
|
@ -250,6 +258,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void pickAfterStateChange() throws Exception {
|
||||
InOrder inOrder = inOrder(mockHelper);
|
||||
|
|
@ -282,8 +291,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
ConnectivityStateInfo.forNonError(IDLE));
|
||||
|
||||
verify(subchannel, times(2)).requestConnection();
|
||||
verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
|
||||
any(Attributes.class));
|
||||
verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class));
|
||||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
|
|
@ -329,6 +337,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
verifyNoMoreInteractions(mockHelper);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void nameResolutionErrorWithActiveChannels() throws Exception {
|
||||
final Subchannel readySubchannel = subchannels.values().iterator().next();
|
||||
|
|
@ -336,8 +345,7 @@ public class RoundRobinLoadBalancerTest {
|
|||
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
|
||||
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
|
||||
|
||||
verify(mockHelper, times(3)).createSubchannel(any(EquivalentAddressGroup.class),
|
||||
any(Attributes.class));
|
||||
verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class));
|
||||
verify(mockHelper, times(3))
|
||||
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package io.grpc.grpclb;
|
|||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.grpclb.CachedSubchannelPool.SHUTDOWN_TIMEOUT_MS;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
|
@ -39,6 +40,8 @@ import io.grpc.grpclb.CachedSubchannelPool.ShutdownSubchannelTask;
|
|||
import io.grpc.internal.FakeClock;
|
||||
import io.grpc.internal.SerializingExecutor;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -73,19 +76,21 @@ public class CachedSubchannelPoolTest {
|
|||
private final ArrayList<Subchannel> mockSubchannels = new ArrayList<>();
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void setUp() {
|
||||
doAnswer(new Answer<Subchannel>() {
|
||||
@Override
|
||||
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
|
||||
Subchannel subchannel = mock(Subchannel.class);
|
||||
EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
|
||||
List<EquivalentAddressGroup> eagList =
|
||||
(List<EquivalentAddressGroup>) invocation.getArguments()[0];
|
||||
Attributes attrs = (Attributes) invocation.getArguments()[1];
|
||||
when(subchannel.getAddresses()).thenReturn(eag);
|
||||
when(subchannel.getAllAddresses()).thenReturn(eagList);
|
||||
when(subchannel.getAttributes()).thenReturn(attrs);
|
||||
mockSubchannels.add(subchannel);
|
||||
return subchannel;
|
||||
}
|
||||
}).when(helper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
|
||||
}).when(helper).createSubchannel(any(List.class), any(Attributes.class));
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
|
|
@ -109,12 +114,12 @@ public class CachedSubchannelPoolTest {
|
|||
public void subchannelExpireAfterReturned() {
|
||||
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
|
||||
assertThat(subchannel1).isNotNull();
|
||||
verify(helper).createSubchannel(same(EAG1), same(ATTRS1));
|
||||
verify(helper).createSubchannel(eq(Arrays.asList(EAG1)), same(ATTRS1));
|
||||
|
||||
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
|
||||
assertThat(subchannel2).isNotNull();
|
||||
assertThat(subchannel2).isNotSameAs(subchannel1);
|
||||
verify(helper).createSubchannel(same(EAG2), same(ATTRS2));
|
||||
verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
|
||||
|
||||
pool.returnSubchannel(subchannel1);
|
||||
|
||||
|
|
@ -140,12 +145,12 @@ public class CachedSubchannelPoolTest {
|
|||
public void subchannelReused() {
|
||||
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
|
||||
assertThat(subchannel1).isNotNull();
|
||||
verify(helper).createSubchannel(same(EAG1), same(ATTRS1));
|
||||
verify(helper).createSubchannel(eq(Arrays.asList(EAG1)), same(ATTRS1));
|
||||
|
||||
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
|
||||
assertThat(subchannel2).isNotNull();
|
||||
assertThat(subchannel2).isNotSameAs(subchannel1);
|
||||
verify(helper).createSubchannel(same(EAG2), same(ATTRS2));
|
||||
verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
|
||||
|
||||
pool.returnSubchannel(subchannel1);
|
||||
|
||||
|
|
@ -167,7 +172,7 @@ public class CachedSubchannelPoolTest {
|
|||
// pool will create a new channel for EAG2 when requested
|
||||
Subchannel subchannel2a = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
|
||||
assertThat(subchannel2a).isNotSameAs(subchannel2);
|
||||
verify(helper, times(2)).createSubchannel(same(EAG2), same(ATTRS2));
|
||||
verify(helper, times(2)).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
|
||||
|
||||
// subchannel1 expires SHUTDOWN_TIMEOUT_MS after being returned
|
||||
pool.returnSubchannel(subchannel1a);
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ public class GrpclbLoadBalancerTest {
|
|||
Subchannel subchannel = mock(Subchannel.class);
|
||||
EquivalentAddressGroup eag = (EquivalentAddressGroup) invocation.getArguments()[0];
|
||||
Attributes attrs = (Attributes) invocation.getArguments()[1];
|
||||
when(subchannel.getAddresses()).thenReturn(eag);
|
||||
when(subchannel.getAllAddresses()).thenReturn(Arrays.asList(eag));
|
||||
when(subchannel.getAttributes()).thenReturn(attrs);
|
||||
mockSubchannels.add(subchannel);
|
||||
subchannelTracker.add(subchannel);
|
||||
|
|
@ -262,6 +262,7 @@ public class GrpclbLoadBalancerTest {
|
|||
}
|
||||
|
||||
@After
|
||||
@SuppressWarnings("unchecked")
|
||||
public void tearDown() {
|
||||
try {
|
||||
if (balancer != null) {
|
||||
|
|
@ -285,7 +286,7 @@ public class GrpclbLoadBalancerTest {
|
|||
verify(subchannel, never()).shutdown();
|
||||
}
|
||||
verify(helper, never())
|
||||
.createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
|
||||
.createSubchannel(any(List.class), any(Attributes.class));
|
||||
// No timer should linger after shutdown
|
||||
assertThat(fakeClock.getPendingTasks()).isEmpty();
|
||||
} finally {
|
||||
|
|
|
|||
Loading…
Reference in New Issue