From a05ab57561c5752a7fc32a854130a15f8d7041e3 Mon Sep 17 00:00:00 2001 From: William Thurston Date: Thu, 9 Jun 2016 08:30:55 -0700 Subject: [PATCH] core: emit lists of lists from NameResolver --- .../main/java/io/grpc/DnsNameResolver.java | 7 ++-- ...ory.java => DummyLoadBalancerFactory.java} | 33 ++++++++++--------- core/src/main/java/io/grpc/LoadBalancer.java | 9 +++-- .../java/io/grpc/ManagedChannelBuilder.java | 4 +-- core/src/main/java/io/grpc/NameResolver.java | 6 ++-- .../AbstractManagedChannelImplBuilder.java | 7 ++-- .../io/grpc/internal/ManagedChannelImpl.java | 14 ++++++-- .../java/io/grpc/DnsNameResolverTest.java | 12 ++++--- ...erTest.java => DummyLoadBalancerTest.java} | 14 ++++---- .../grpc/internal/ManagedChannelImplTest.java | 10 +++--- .../io/grpc/grpclb/GrpclbLoadBalancer.java | 8 +++-- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 3 +- 12 files changed, 77 insertions(+), 50 deletions(-) rename core/src/main/java/io/grpc/{SimpleLoadBalancerFactory.java => DummyLoadBalancerFactory.java} (81%) rename core/src/test/java/io/grpc/{SimpleLoadBalancerTest.java => DummyLoadBalancerTest.java} (92%) diff --git a/core/src/main/java/io/grpc/DnsNameResolver.java b/core/src/main/java/io/grpc/DnsNameResolver.java index ae5fa04578..8d5b211e02 100644 --- a/core/src/main/java/io/grpc/DnsNameResolver.java +++ b/core/src/main/java/io/grpc/DnsNameResolver.java @@ -42,6 +42,8 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -154,14 +156,15 @@ class DnsNameResolver extends NameResolver { savedListener.onError(Status.UNAVAILABLE.withCause(e)); return; } - ArrayList servers = + List servers = new ArrayList(inetAddrs.length); for (int i = 0; i < inetAddrs.length; i++) { InetAddress inetAddr = inetAddrs[i]; servers.add( new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY)); } - savedListener.onUpdate(servers, Attributes.EMPTY); + savedListener.onUpdate( + Collections.singletonList(servers), Attributes.EMPTY); } finally { synchronized (DnsNameResolver.this) { resolving = false; diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/DummyLoadBalancerFactory.java similarity index 81% rename from core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java rename to core/src/main/java/io/grpc/DummyLoadBalancerFactory.java index ebeff49582..a8bc19b4ac 100644 --- a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/DummyLoadBalancerFactory.java @@ -42,30 +42,30 @@ import java.util.List; import javax.annotation.concurrent.GuardedBy; /** - * A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the - * addresses from the {@link NameResolver}. + * A {@link LoadBalancer} that provides no load balancing mechanism over the + * addresses from the {@link NameResolver}. The channel's default behavior + * (currently pick-first) is used for all addresses found. */ -// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin. @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") -public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { +public final class DummyLoadBalancerFactory extends LoadBalancer.Factory { - private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory(); + private static final DummyLoadBalancerFactory instance = new DummyLoadBalancerFactory(); - private SimpleLoadBalancerFactory() { + private DummyLoadBalancerFactory() { } - public static SimpleLoadBalancerFactory getInstance() { + public static DummyLoadBalancerFactory getInstance() { return instance; } @Override public LoadBalancer newLoadBalancer(String serviceName, TransportManager tm) { - return new SimpleLoadBalancer(tm); + return new DummyLoadBalancer(tm); } - private static class SimpleLoadBalancer extends LoadBalancer { + private static class DummyLoadBalancer extends LoadBalancer { private static final Status SHUTDOWN_STATUS = - Status.UNAVAILABLE.augmentDescription("SimpleLoadBalancer has shut down"); + Status.UNAVAILABLE.augmentDescription("DummyLoadBalancer has shut down"); private final Object lock = new Object(); @@ -80,7 +80,7 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { private final TransportManager tm; - private SimpleLoadBalancer(TransportManager tm) { + private DummyLoadBalancer(TransportManager tm) { this.tm = tm; } @@ -107,17 +107,18 @@ public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { @Override public void handleResolvedAddresses( - List updatedServers, Attributes config) { + List> updatedServers, Attributes config) { InterimTransport savedInterimTransport; final EquivalentAddressGroup newAddresses; synchronized (lock) { if (closed) { return; } - ArrayList newAddressList = - new ArrayList(updatedServers.size()); - for (ResolvedServerInfo server : updatedServers) { - newAddressList.add(server.getAddress()); + ArrayList newAddressList = new ArrayList(); + for (List servers : updatedServers) { + for (ResolvedServerInfo server : servers) { + newAddressList.add(server.getAddress()); + } } newAddresses = new EquivalentAddressGroup(newAddressList); if (newAddresses.equals(addresses)) { diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 12b720e089..e0a8447e85 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -66,14 +66,17 @@ public abstract class LoadBalancer { public void shutdown() { } /** - * Handles newly resolved addresses and service config from name resolution system. + * Handles newly resolved addresses and service config from name resolution system. Sublists + * should be considered equivalent with an {@link EquivalentAddressGroup}, but may be flattened + * into a single list if needed. * *

Implementations should not modify the given {@code servers}. * - * @param servers the resolved server addresses. Never empty. + * @param servers the resolved server addresses, never empty. * @param config extra configuration data from naming system. */ - public void handleResolvedAddresses(List servers, Attributes config) { } + public void handleResolvedAddresses(List> servers, + Attributes config) { } /** * Handles an error from the name resolution system. diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java index 93bce2fc1d..bae1cae116 100644 --- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -157,8 +157,8 @@ public abstract class ManagedChannelBuilder> /** * Provides a custom {@link LoadBalancer.Factory} for the channel. * - *

If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the - * channel. + *

If this method is not called, the builder will use {@link DummyLoadBalancerFactory} + * for the channel. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory); diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java index 2699fdecd4..0189c523c4 100644 --- a/core/src/main/java/io/grpc/NameResolver.java +++ b/core/src/main/java/io/grpc/NameResolver.java @@ -120,10 +120,12 @@ public abstract class NameResolver { * *

Implementations will not modify the given {@code servers}. * - * @param servers the resolved server addresses. An empty list will trigger {@link #onError} + * @param servers the resolved server addresses. Sublists should be considered to be + * an {@link EquivalentAddressGroup}. An empty list or all sublists being empty + * will trigger {@link #onError} * @param config extra configuration data from naming system */ - void onUpdate(List servers, Attributes config); + void onUpdate(List> servers, Attributes config); /** * Handles an error from the resolver. diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 4cb74e9b98..32dbd95bca 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -41,12 +41,12 @@ import io.grpc.Attributes; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; +import io.grpc.DummyLoadBalancerFactory; import io.grpc.LoadBalancer; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import io.grpc.NameResolverRegistry; import io.grpc.ResolvedServerInfo; -import io.grpc.SimpleLoadBalancerFactory; import java.net.SocketAddress; import java.net.URI; @@ -213,7 +213,7 @@ public abstract class AbstractManagedChannelImplBuilder new ExponentialBackoffPolicy.Provider(), firstNonNull(nameResolverFactory, NameResolverRegistry.getDefaultRegistry()), getNameResolverParams(), - firstNonNull(loadBalancerFactory, SimpleLoadBalancerFactory.getInstance()), + firstNonNull(loadBalancerFactory, DummyLoadBalancerFactory.getInstance()), transportFactory, firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), @@ -279,7 +279,8 @@ public abstract class AbstractManagedChannelImplBuilder @Override public void start(final Listener listener) { listener.onUpdate( - Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)), + Collections.singletonList( + Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY))), Attributes.EMPTY); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 3618565825..d9d97fcaae 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -175,8 +175,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI this.nameResolver.start(new NameResolver.Listener() { @Override - public void onUpdate(List servers, Attributes config) { - if (servers.isEmpty()) { + public void onUpdate(List> servers, Attributes config) { + if (serversAreEmpty(servers)) { onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list")); } else { try { @@ -201,6 +201,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI } } + private static boolean serversAreEmpty(List> servers) { + for (List serverInfos : servers) { + if (!serverInfos.isEmpty()) { + return false; + } + } + + return true; + } + @VisibleForTesting static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, Attributes nameResolverParams) { diff --git a/core/src/test/java/io/grpc/DnsNameResolverTest.java b/core/src/test/java/io/grpc/DnsNameResolverTest.java index f9c2f316b9..3a3b7d933d 100644 --- a/core/src/test/java/io/grpc/DnsNameResolverTest.java +++ b/core/src/test/java/io/grpc/DnsNameResolverTest.java @@ -40,6 +40,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.common.collect.Iterables; + import io.grpc.internal.FakeClock; import io.grpc.internal.SharedResourceHolder.Resource; @@ -102,7 +104,7 @@ public class DnsNameResolverTest { @Mock private NameResolver.Listener mockListener; @Captor - private ArgumentCaptor> resultCaptor; + private ArgumentCaptor>> resultCaptor; @Captor private ArgumentCaptor statusCaptor; @@ -149,14 +151,14 @@ public class DnsNameResolverTest { assertEquals(1, fakeExecutor.runDueTasks()); verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class)); assertEquals(name, resolver.invocations.poll()); - assertAnswerMatches(answer1, 81, resultCaptor.getValue()); + assertAnswerMatches(answer1, 81, Iterables.getOnlyElement(resultCaptor.getValue())); assertEquals(0, fakeClock.numPendingTasks()); resolver.refresh(); assertEquals(1, fakeExecutor.runDueTasks()); verify(mockListener, times(2)).onUpdate(resultCaptor.capture(), any(Attributes.class)); assertEquals(name, resolver.invocations.poll()); - assertAnswerMatches(answer2, 81, resultCaptor.getValue()); + assertAnswerMatches(answer2, 81, Iterables.getOnlyElement(resultCaptor.getValue())); assertEquals(0, fakeClock.numPendingTasks()); resolver.shutdown(); @@ -201,7 +203,7 @@ public class DnsNameResolverTest { assertEquals(1, fakeExecutor.runDueTasks()); verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class)); assertEquals(name, resolver.invocations.poll()); - assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue())); verifyNoMoreInteractions(mockListener); } @@ -229,7 +231,7 @@ public class DnsNameResolverTest { assertEquals(0, fakeClock.numPendingTasks()); verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class)); assertEquals(name, resolver.invocations.poll()); - assertAnswerMatches(answer, 81, resultCaptor.getValue()); + assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue())); verifyNoMoreInteractions(mockListener); } diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/DummyLoadBalancerTest.java similarity index 92% rename from core/src/test/java/io/grpc/SimpleLoadBalancerTest.java rename to core/src/test/java/io/grpc/DummyLoadBalancerTest.java index a8e4100161..ad4bd0919f 100644 --- a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/DummyLoadBalancerTest.java @@ -55,13 +55,14 @@ import org.mockito.MockitoAnnotations; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.List; -/** Unit test for {@link SimpleLoadBalancerFactory}. */ +/** Unit test for {@link DummyLoadBalancerFactory}. */ @RunWith(JUnit4.class) -public class SimpleLoadBalancerTest { +public class DummyLoadBalancerTest { private LoadBalancer loadBalancer; - private ArrayList servers; + private List> servers; private EquivalentAddressGroup addressGroup; @Mock private TransportManager mockTransportManager; @@ -73,13 +74,14 @@ public class SimpleLoadBalancerTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer( + loadBalancer = DummyLoadBalancerFactory.getInstance().newLoadBalancer( "fakeservice", mockTransportManager); - servers = new ArrayList(); + servers = new ArrayList>(); + servers.add(new ArrayList()); ArrayList addresses = new ArrayList(); for (int i = 0; i < 3; i++) { SocketAddress addr = new FakeSocketAddress("server" + i); - servers.add(new ResolvedServerInfo(addr, Attributes.EMPTY)); + servers.get(0).add(new ResolvedServerInfo(addr, Attributes.EMPTY)); addresses.add(addr); } addressGroup = new EquivalentAddressGroup(addresses); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 2073e8c038..9245a11808 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -62,6 +62,7 @@ import io.grpc.ClientInterceptor; import io.grpc.Compressor; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; +import io.grpc.DummyLoadBalancerFactory; import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; @@ -69,7 +70,6 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; import io.grpc.ResolvedServerInfo; -import io.grpc.SimpleLoadBalancerFactory; import io.grpc.Status; import io.grpc.StringMarshaller; import io.grpc.TransportManager; @@ -120,7 +120,7 @@ public class ManagedChannelImplTest { private final SocketAddress socketAddress = new SocketAddress() {}; private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY); private SpyingLoadBalancerFactory loadBalancerFactory = - new SpyingLoadBalancerFactory(SimpleLoadBalancerFactory.getInstance()); + new SpyingLoadBalancerFactory(DummyLoadBalancerFactory.getInstance()); @Rule public final ExpectedException thrown = ExpectedException.none(); @@ -494,7 +494,7 @@ public class ManagedChannelImplTest { } @Test - public void nameResolverReturnsEmptyList() { + public void nameResolverReturnsEmptySubLists() { String errorDescription = "NameResolver returned an empty list"; // Name resolution is started as soon as channel is created @@ -527,7 +527,7 @@ public class ManagedChannelImplTest { assertEquals(1, loadBalancerFactory.balancers.size()); LoadBalancer loadBalancer = loadBalancerFactory.balancers.get(0); doThrow(ex).when(loadBalancer).handleResolvedAddresses( - Matchers.>anyObject(), any(Attributes.class)); + Matchers.>>anyObject(), any(Attributes.class)); // NameResolver returns addresses. nameResolverFactory.allResolved(); @@ -806,7 +806,7 @@ public class ManagedChannelImplTest { } void resolved() { - listener.onUpdate(servers, Attributes.EMPTY); + listener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY); } @Override public void shutdown() { diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 90c73a66c5..22f4a86db7 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -150,14 +150,16 @@ class GrpclbLoadBalancer extends LoadBalancer { @Override public void handleResolvedAddresses( - List updatedServers, Attributes config) { + List> updatedServers, Attributes config) { synchronized (lock) { if (closed) { return; } ArrayList addrs = new ArrayList(updatedServers.size()); - for (ResolvedServerInfo serverInfo : updatedServers) { - addrs.add(serverInfo.getAddress()); + for (List serverInfos : updatedServers) { + for (ResolvedServerInfo serverInfo : serverInfos) { + addrs.add(serverInfo.getAddress()); + } } EquivalentAddressGroup newLbAddresses = new EquivalentAddressGroup(addrs); if (!newLbAddresses.equals(lbAddresses)) { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index d2f8878393..c37ef1ae2c 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -430,7 +430,8 @@ public class GrpclbLoadBalancerTest { lbAddressGroup = buildAddressGroup(lbServerInfo); Transport lbTransport = new Transport(); when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport); - loadBalancer.handleResolvedAddresses(Collections.singletonList(lbServerInfo), Attributes.EMPTY); + loadBalancer.handleResolvedAddresses( + Collections.singletonList(Collections.singletonList(lbServerInfo)), Attributes.EMPTY); verify(mockTransportManager).getTransport(eq(lbAddressGroup)); return lbTransport; }