core: emit lists of lists from NameResolver

This commit is contained in:
William Thurston 2016-06-09 08:30:55 -07:00 committed by Eric Anderson
parent 03d9450968
commit a05ab57561
12 changed files with 77 additions and 50 deletions

View File

@ -42,6 +42,8 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -154,14 +156,15 @@ class DnsNameResolver extends NameResolver {
savedListener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
ArrayList<ResolvedServerInfo> servers =
List<ResolvedServerInfo> servers =
new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
}
savedListener.onUpdate(servers, Attributes.EMPTY);
savedListener.onUpdate(
Collections.singletonList(servers), Attributes.EMPTY);
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;

View File

@ -42,30 +42,30 @@ import java.util.List;
import javax.annotation.concurrent.GuardedBy;
/**
* A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the
* addresses from the {@link NameResolver}.
* A {@link LoadBalancer} that provides no load balancing mechanism over the
* addresses from the {@link NameResolver}. The channel's default behavior
* (currently pick-first) is used for all addresses found.
*/
// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin.
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
public final class DummyLoadBalancerFactory extends LoadBalancer.Factory {
private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory();
private static final DummyLoadBalancerFactory instance = new DummyLoadBalancerFactory();
private SimpleLoadBalancerFactory() {
private DummyLoadBalancerFactory() {
}
public static SimpleLoadBalancerFactory getInstance() {
public static DummyLoadBalancerFactory getInstance() {
return instance;
}
@Override
public <T> LoadBalancer<T> newLoadBalancer(String serviceName, TransportManager<T> tm) {
return new SimpleLoadBalancer<T>(tm);
return new DummyLoadBalancer<T>(tm);
}
private static class SimpleLoadBalancer<T> extends LoadBalancer<T> {
private static class DummyLoadBalancer<T> extends LoadBalancer<T> {
private static final Status SHUTDOWN_STATUS =
Status.UNAVAILABLE.augmentDescription("SimpleLoadBalancer has shut down");
Status.UNAVAILABLE.augmentDescription("DummyLoadBalancer has shut down");
private final Object lock = new Object();
@ -80,7 +80,7 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
private final TransportManager<T> tm;
private SimpleLoadBalancer(TransportManager<T> tm) {
private DummyLoadBalancer(TransportManager<T> tm) {
this.tm = tm;
}
@ -107,17 +107,18 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfo> updatedServers, Attributes config) {
List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) {
InterimTransport<T> savedInterimTransport;
final EquivalentAddressGroup newAddresses;
synchronized (lock) {
if (closed) {
return;
}
ArrayList<SocketAddress> newAddressList =
new ArrayList<SocketAddress>(updatedServers.size());
for (ResolvedServerInfo server : updatedServers) {
newAddressList.add(server.getAddress());
ArrayList<SocketAddress> newAddressList = new ArrayList<SocketAddress>();
for (List<ResolvedServerInfo> servers : updatedServers) {
for (ResolvedServerInfo server : servers) {
newAddressList.add(server.getAddress());
}
}
newAddresses = new EquivalentAddressGroup(newAddressList);
if (newAddresses.equals(addresses)) {

View File

@ -66,14 +66,17 @@ public abstract class LoadBalancer<T> {
public void shutdown() { }
/**
* Handles newly resolved addresses and service config from name resolution system.
* Handles newly resolved addresses and service config from name resolution system. Sublists
* should be considered equivalent with an {@link EquivalentAddressGroup}, but may be flattened
* into a single list if needed.
*
* <p>Implementations should not modify the given {@code servers}.
*
* @param servers the resolved server addresses. Never empty.
* @param servers the resolved server addresses, never empty.
* @param config extra configuration data from naming system.
*/
public void handleResolvedAddresses(List<ResolvedServerInfo> servers, Attributes config) { }
public void handleResolvedAddresses(List<? extends List<ResolvedServerInfo>> servers,
Attributes config) { }
/**
* Handles an error from the name resolution system.

View File

@ -157,8 +157,8 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
/**
* Provides a custom {@link LoadBalancer.Factory} for the channel.
*
* <p>If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the
* channel.
* <p>If this method is not called, the builder will use {@link DummyLoadBalancerFactory}
* for the channel.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);

View File

@ -120,10 +120,12 @@ public abstract class NameResolver {
*
* <p>Implementations will not modify the given {@code servers}.
*
* @param servers the resolved server addresses. An empty list will trigger {@link #onError}
* @param servers the resolved server addresses. Sublists should be considered to be
* an {@link EquivalentAddressGroup}. An empty list or all sublists being empty
* will trigger {@link #onError}
* @param config extra configuration data from naming system
*/
void onUpdate(List<ResolvedServerInfo> servers, Attributes config);
void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config);
/**
* Handles an error from the resolver.

View File

@ -41,12 +41,12 @@ import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.DummyLoadBalancerFactory;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverRegistry;
import io.grpc.ResolvedServerInfo;
import io.grpc.SimpleLoadBalancerFactory;
import java.net.SocketAddress;
import java.net.URI;
@ -213,7 +213,7 @@ public abstract class AbstractManagedChannelImplBuilder
new ExponentialBackoffPolicy.Provider(),
firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()),
getNameResolverParams(),
firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()),
firstNonNull(loadBalancerFactory, DummyLoadBalancerFactory.getInstance()),
transportFactory,
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()),
@ -279,7 +279,8 @@ public abstract class AbstractManagedChannelImplBuilder
@Override
public void start(final Listener listener) {
listener.onUpdate(
Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)),
Collections.singletonList(
Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY))),
Attributes.EMPTY);
}

View File

@ -175,8 +175,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
this.nameResolver.start(new NameResolver.Listener() {
@Override
public void onUpdate(List<ResolvedServerInfo> servers, Attributes config) {
if (servers.isEmpty()) {
public void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config) {
if (serversAreEmpty(servers)) {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
} else {
try {
@ -201,6 +201,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
}
}
private static boolean serversAreEmpty(List<? extends List<ResolvedServerInfo>> servers) {
for (List<ResolvedServerInfo> serverInfos : servers) {
if (!serverInfos.isEmpty()) {
return false;
}
}
return true;
}
@VisibleForTesting
static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
Attributes nameResolverParams) {

View File

@ -40,6 +40,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.Iterables;
import io.grpc.internal.FakeClock;
import io.grpc.internal.SharedResourceHolder.Resource;
@ -102,7 +104,7 @@ public class DnsNameResolverTest {
@Mock
private NameResolver.Listener mockListener;
@Captor
private ArgumentCaptor<List<ResolvedServerInfo>> resultCaptor;
private ArgumentCaptor<List<List<ResolvedServerInfo>>> resultCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@ -149,14 +151,14 @@ public class DnsNameResolverTest {
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertAnswerMatches(answer1, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertEquals(0, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertAnswerMatches(answer2, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertEquals(0, fakeClock.numPendingTasks());
resolver.shutdown();
@ -201,7 +203,7 @@ public class DnsNameResolverTest {
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
verifyNoMoreInteractions(mockListener);
}
@ -229,7 +231,7 @@ public class DnsNameResolverTest {
assertEquals(0, fakeClock.numPendingTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer, 81, resultCaptor.getValue());
assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
verifyNoMoreInteractions(mockListener);
}

View File

@ -55,13 +55,14 @@ import org.mockito.MockitoAnnotations;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
/** Unit test for {@link SimpleLoadBalancerFactory}. */
/** Unit test for {@link DummyLoadBalancerFactory}. */
@RunWith(JUnit4.class)
public class SimpleLoadBalancerTest {
public class DummyLoadBalancerTest {
private LoadBalancer<Transport> loadBalancer;
private ArrayList<ResolvedServerInfo> servers;
private List<List<ResolvedServerInfo>> servers;
private EquivalentAddressGroup addressGroup;
@Mock private TransportManager<Transport> mockTransportManager;
@ -73,13 +74,14 @@ public class SimpleLoadBalancerTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
loadBalancer = DummyLoadBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager);
servers = new ArrayList<ResolvedServerInfo>();
servers = new ArrayList<List<ResolvedServerInfo>>();
servers.add(new ArrayList<ResolvedServerInfo>());
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int i = 0; i < 3; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
servers.add(new ResolvedServerInfo(addr, Attributes.EMPTY));
servers.get(0).add(new ResolvedServerInfo(addr, Attributes.EMPTY));
addresses.add(addr);
}
addressGroup = new EquivalentAddressGroup(addresses);

View File

@ -62,6 +62,7 @@ import io.grpc.ClientInterceptor;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.DummyLoadBalancerFactory;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
@ -69,7 +70,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.SimpleLoadBalancerFactory;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.TransportManager;
@ -120,7 +120,7 @@ public class ManagedChannelImplTest {
private final SocketAddress socketAddress = new SocketAddress() {};
private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
private SpyingLoadBalancerFactory loadBalancerFactory =
new SpyingLoadBalancerFactory(SimpleLoadBalancerFactory.getInstance());
new SpyingLoadBalancerFactory(DummyLoadBalancerFactory.getInstance());
@Rule public final ExpectedException thrown = ExpectedException.none();
@ -494,7 +494,7 @@ public class ManagedChannelImplTest {
}
@Test
public void nameResolverReturnsEmptyList() {
public void nameResolverReturnsEmptySubLists() {
String errorDescription = "NameResolver returned an empty list";
// Name resolution is started as soon as channel is created
@ -527,7 +527,7 @@ public class ManagedChannelImplTest {
assertEquals(1, loadBalancerFactory.balancers.size());
LoadBalancer<?> loadBalancer = loadBalancerFactory.balancers.get(0);
doThrow(ex).when(loadBalancer).handleResolvedAddresses(
Matchers.<List<ResolvedServerInfo>>anyObject(), any(Attributes.class));
Matchers.<List<List<ResolvedServerInfo>>>anyObject(), any(Attributes.class));
// NameResolver returns addresses.
nameResolverFactory.allResolved();
@ -806,7 +806,7 @@ public class ManagedChannelImplTest {
}
void resolved() {
listener.onUpdate(servers, Attributes.EMPTY);
listener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY);
}
@Override public void shutdown() {

View File

@ -150,14 +150,16 @@ class GrpclbLoadBalancer<T> extends LoadBalancer<T> {
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfo> updatedServers, Attributes config) {
List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) {
synchronized (lock) {
if (closed) {
return;
}
ArrayList<SocketAddress> addrs = new ArrayList<SocketAddress>(updatedServers.size());
for (ResolvedServerInfo serverInfo : updatedServers) {
addrs.add(serverInfo.getAddress());
for (List<ResolvedServerInfo> serverInfos : updatedServers) {
for (ResolvedServerInfo serverInfo : serverInfos) {
addrs.add(serverInfo.getAddress());
}
}
EquivalentAddressGroup newLbAddresses = new EquivalentAddressGroup(addrs);
if (!newLbAddresses.equals(lbAddresses)) {

View File

@ -430,7 +430,8 @@ public class GrpclbLoadBalancerTest {
lbAddressGroup = buildAddressGroup(lbServerInfo);
Transport lbTransport = new Transport();
when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport);
loadBalancer.handleResolvedAddresses(Collections.singletonList(lbServerInfo), Attributes.EMPTY);
loadBalancer.handleResolvedAddresses(
Collections.singletonList(Collections.singletonList(lbServerInfo)), Attributes.EMPTY);
verify(mockTransportManager).getTransport(eq(lbAddressGroup));
return lbTransport;
}