core: change API of NameResolver#onUpdate

Instead of `List<List<ResolvedServerInfo>>`, `onUpdate` now takes
`List<ResolvedServerInfoGroup>` as an argument and every `ResolvedServerInfoGroup`
object can have `Attributes` attached to it which means that we can provide
attributes on each level:

  * root level via `onUpdate` argument (applies to all servers)
  * group level via property of `ResolvedServerInfoGroup` (applies to all servers
    in the group)
  * host level via property of `ResolvedServerInfo` (applies to a single server)
This commit is contained in:
Lukasz Strzalkowski 2016-08-19 16:56:56 +02:00 committed by Kun Zhang
parent 577164c42f
commit 23652c5b03
16 changed files with 328 additions and 122 deletions

View File

@ -66,17 +66,17 @@ public abstract class LoadBalancer<T> {
public void shutdown() { } public void shutdown() { }
/** /**
* Handles newly resolved addresses and service config from name resolution system. Sublists * Handles newly resolved server groups and metadata attributes from name resolution system.
* should be considered equivalent with an {@link EquivalentAddressGroup}, but may be flattened * {@code servers} contained in {@link ResolvedServerInfoGroup} should be considered equivalent
* into a single list if needed. * but may be flattened into a single list if needed.
* *
* <p>Implementations should not modify the given {@code servers}. * <p>Implementations should not modify the given {@code servers}.
* *
* @param servers the resolved server addresses, never empty. * @param servers the resolved server addresses, never empty.
* @param config extra configuration data from naming system. * @param attributes extra metadata from naming system.
*/ */
public void handleResolvedAddresses(List<? extends List<ResolvedServerInfo>> servers, public void handleResolvedAddresses(List<ResolvedServerInfoGroup> servers,
Attributes config) { } Attributes attributes) { }
/** /**
* Handles an error from the name resolution system. * Handles an error from the name resolution system.

View File

@ -118,16 +118,15 @@ public abstract class NameResolver {
@ThreadSafe @ThreadSafe
public interface Listener { public interface Listener {
/** /**
* Handles updates on resolved addresses and config. * Handles updates on resolved addresses and attributes.
* *
* <p>Implementations will not modify the given {@code servers}. * <p>Implementations will not modify the given {@code servers}.
* *
* @param servers the resolved server addresses. Sublists should be considered to be * @param servers the resolved server groups, containing {@link ResolvedServerInfo} objects. An
* an {@link EquivalentAddressGroup}. An empty list or all sublists being empty * empty list will trigger {@link #onError}
* will trigger {@link #onError} * @param attributes extra metadata from naming system
* @param config extra configuration data from naming system
*/ */
void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config); void onUpdate(List<ResolvedServerInfoGroup> servers, Attributes attributes);
/** /**
* Handles an error from the resolver. * Handles an error from the resolver.

View File

@ -106,21 +106,15 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
} }
@Override @Override
public void handleResolvedAddresses( public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) { Attributes attributes) {
InterimTransport<T> savedInterimTransport; InterimTransport<T> savedInterimTransport;
final EquivalentAddressGroup newAddresses; final EquivalentAddressGroup newAddresses;
synchronized (lock) { synchronized (lock) {
if (closed) { if (closed) {
return; return;
} }
ArrayList<SocketAddress> newAddressList = new ArrayList<SocketAddress>(); newAddresses = resolvedServerInfoGroupsToEquivalentAddressGroup(updatedServers);
for (List<ResolvedServerInfo> servers : updatedServers) {
for (ResolvedServerInfo server : servers) {
newAddressList.add(server.getAddress());
}
}
newAddresses = new EquivalentAddressGroup(newAddressList);
if (newAddresses.equals(addresses)) { if (newAddresses.equals(addresses)) {
return; return;
} }
@ -170,5 +164,19 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
savedInterimTransport.closeWithError(SHUTDOWN_STATUS); savedInterimTransport.closeWithError(SHUTDOWN_STATUS);
} }
} }
/**
* Converts list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup resolvedServerInfoGroupsToEquivalentAddressGroup(
List<ResolvedServerInfoGroup> groupList) {
List<SocketAddress> addrs = new ArrayList<SocketAddress>(groupList.size());
for (ResolvedServerInfoGroup group : groupList) {
for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
addrs.add(srv.getAddress());
}
}
return new EquivalentAddressGroup(addrs);
}
} }
} }

View File

@ -0,0 +1,177 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.concurrent.Immutable;
/**
* A group of {@link ResolvedServerInfo}s that is returned from a {@link NameResolver}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@Immutable
public final class ResolvedServerInfoGroup {
private final List<ResolvedServerInfo> resolvedServerInfoList;
private final Attributes attributes;
/**
* Constructs a new resolved server info group from {@link ResolvedServerInfo} list,
* with custom {@link Attributes} attached to it.
*
* @param resolvedServerInfoList list of resolved server info objects.
* @param attributes custom attributes for a given group.
*/
private ResolvedServerInfoGroup(List<ResolvedServerInfo> resolvedServerInfoList,
Attributes attributes) {
checkArgument(!resolvedServerInfoList.isEmpty(), "empty server list");
this.resolvedServerInfoList = Collections.unmodifiableList(resolvedServerInfoList);
this.attributes = checkNotNull(attributes, "attributes");
}
/**
* Returns immutable list of {@link ResolvedServerInfo} objects for this group.
*/
public List<ResolvedServerInfo> getResolvedServerInfoList() {
return resolvedServerInfoList;
}
/**
* Returns {@link Attributes} for this group.
*/
public Attributes getAttributes() {
return attributes;
}
/**
* Converts this group to {@link EquivalentAddressGroup} object.
*/
public EquivalentAddressGroup toEquivalentAddressGroup() {
List<SocketAddress> addrs = new ArrayList<SocketAddress>(resolvedServerInfoList.size());
for (ResolvedServerInfo resolvedServerInfo : resolvedServerInfoList) {
addrs.add(resolvedServerInfo.getAddress());
}
return new EquivalentAddressGroup(addrs);
}
/**
* Creates a new builder.
*/
public static Builder builder() {
return new Builder();
}
/**
* Creates a new builder for a group with extra attributes.
*/
public static Builder builder(Attributes attributes) {
return new Builder(attributes);
}
/**
* Returns true if the given object is also a {@link ResolvedServerInfoGroup} with an equal
* attributes and list of {@link ResolvedServerInfo} objects.
*
* @param o an object.
* @return true if the given object is a {@link ResolvedServerInfoGroup} with an equal attributes
* and server info list.
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResolvedServerInfoGroup that = (ResolvedServerInfoGroup) o;
return Objects.equal(resolvedServerInfoList, that.resolvedServerInfoList)
&& Objects.equal(attributes, that.attributes);
}
/**
* Returns a hash code for the resolved server info group.
*
* <p>Note that if a resolver includes mutable values in the attributes, this object's hash code
* could change over time. So care must be used when putting these objects into a set or using
* them as keys for a map.
*
* @return a hash code for the server info group.
*/
@Override
public int hashCode() {
return Objects.hashCode(resolvedServerInfoList, attributes);
}
@Override
public String toString() {
return "[servers=" + resolvedServerInfoList + ", attrs=" + attributes + "]";
}
public static class Builder {
private final ImmutableList.Builder<ResolvedServerInfo> groupBuilder;
private final Attributes attributes;
public Builder(Attributes attributes) {
this.groupBuilder = ImmutableList.builder();
this.attributes = attributes;
}
public Builder() {
this(Attributes.EMPTY);
}
public Builder add(ResolvedServerInfo resolvedServerInfo) {
groupBuilder.add(resolvedServerInfo);
return this;
}
public Builder addAll(Collection<ResolvedServerInfo> resolvedServerInfo) {
groupBuilder.addAll(resolvedServerInfo);
return this;
}
public ResolvedServerInfoGroup build() {
return new ResolvedServerInfoGroup(groupBuilder.build(), attributes);
}
}
}

View File

@ -48,6 +48,7 @@ import io.grpc.NameResolver;
import io.grpc.NameResolverProvider; import io.grpc.NameResolverProvider;
import io.grpc.PickFirstBalancerFactory; import io.grpc.PickFirstBalancerFactory;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
@ -320,9 +321,8 @@ public abstract class AbstractManagedChannelImplBuilder
@Override @Override
public void start(final Listener listener) { public void start(final Listener listener) {
listener.onUpdate( listener.onUpdate(Collections.singletonList(
Collections.singletonList( ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(address)).build()),
Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY))),
Attributes.EMPTY); Attributes.EMPTY);
} }

View File

@ -37,6 +37,7 @@ import com.google.common.base.Preconditions;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.internal.SharedResourceHolder.Resource;
@ -44,9 +45,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -160,15 +159,13 @@ class DnsNameResolver extends NameResolver {
savedListener.onError(Status.UNAVAILABLE.withCause(e)); savedListener.onError(Status.UNAVAILABLE.withCause(e));
return; return;
} }
List<ResolvedServerInfo> servers = ResolvedServerInfoGroup.Builder servers = ResolvedServerInfoGroup.builder();
new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) { for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i]; InetAddress inetAddr = inetAddrs[i];
servers.add( servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY)); new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
} }
savedListener.onUpdate( savedListener.onUpdate(Collections.singletonList(servers.build()), Attributes.EMPTY);
Collections.singletonList(servers), Attributes.EMPTY);
} finally { } finally {
synchronized (DnsNameResolver.this) { synchronized (DnsNameResolver.this) {
resolving = false; resolving = false;

View File

@ -52,7 +52,7 @@ import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
@ -355,16 +355,6 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
} }
} }
private static boolean serversAreEmpty(List<? extends List<ResolvedServerInfo>> servers) {
for (List<ResolvedServerInfo> serverInfos : servers) {
if (!serverInfos.isEmpty()) {
return false;
}
}
return true;
}
@VisibleForTesting @VisibleForTesting
static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory,
Attributes nameResolverParams) { Attributes nameResolverParams) {
@ -687,18 +677,19 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
} }
@Override @Override
public void onUpdate(List<? extends List<ResolvedServerInfo>> servers, Attributes config) { public void onUpdate(List<ResolvedServerInfoGroup> servers, Attributes config) {
if (serversAreEmpty(servers)) { if (servers.isEmpty()) {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list")); onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
} else { return;
try { }
balancer.handleResolvedAddresses(servers, config);
} catch (Throwable e) { try {
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may be balancer.handleResolvedAddresses(servers, config);
// propagated to the application. } catch (Throwable e) {
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e) // It must be a bug! Push the exception back to LoadBalancer in the hope that it may be
.withDescription("Thrown from handleResolvedAddresses(): " + e)); // propagated to the application.
} balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e));
} }
} }

View File

@ -40,6 +40,7 @@ import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -105,17 +106,29 @@ public class RoundRobinServerList<T> {
/** /**
* Adds a server to the list, or {@code null} for a drop entry. * Adds a server to the list, or {@code null} for a drop entry.
*/ */
public void add(@Nullable SocketAddress address) { public Builder<T> addSocketAddress(@Nullable SocketAddress address) {
listBuilder.add(new EquivalentAddressGroup(address)); listBuilder.add(new EquivalentAddressGroup(address));
return this;
} }
/** /**
* Adds a list of servers to the list grouped into a single {@link EquivalentAddressGroup}. * Adds a address group to the list.
* *
* @param addresses the addresses to add * @param addresses the addresses to add
*/ */
public void addList(List<SocketAddress> addresses) { public Builder<T> add(EquivalentAddressGroup addresses) {
listBuilder.add(new EquivalentAddressGroup(addresses)); listBuilder.add(addresses);
return this;
}
/**
* Adds a list of address groups.
*
* @param addresses the list of addresses group.
*/
public Builder<T> addAll(Collection<EquivalentAddressGroup> addresses) {
listBuilder.addAll(addresses);
return this;
} }
public RoundRobinServerList<T> build() { public RoundRobinServerList<T> build() {

View File

@ -38,13 +38,12 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
import io.grpc.internal.RoundRobinServerList; import io.grpc.internal.RoundRobinServerList;
import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
@ -116,27 +115,16 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
} }
@Override @Override
public void handleResolvedAddresses( public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) { Attributes attributes) {
final InterimTransport<T> savedInterimTransport; final InterimTransport<T> savedInterimTransport;
final RoundRobinServerList<T> addressesCopy; final RoundRobinServerList<T> addressesCopy;
synchronized (lock) { synchronized (lock) {
if (closed) { if (closed) {
return; return;
} }
RoundRobinServerList.Builder<T> listBuilder = new RoundRobinServerList.Builder<T>(tm); addresses = new RoundRobinServerList.Builder<T>(tm).addAll(
for (List<ResolvedServerInfo> servers : updatedServers) { resolvedServerInfoGroupToEquivalentAddressGroup(updatedServers)).build();
if (servers.isEmpty()) {
continue;
}
final List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>(servers.size());
for (ResolvedServerInfo server : servers) {
socketAddresses.add(server.getAddress());
}
listBuilder.addList(socketAddresses);
}
addresses = listBuilder.build();
addressesCopy = addresses; addressesCopy = addresses;
nameResolutionError = null; nameResolutionError = null;
savedInterimTransport = interimTransport; savedInterimTransport = interimTransport;
@ -183,5 +171,14 @@ public final class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
savedInterimTransport.closeWithError(SHUTDOWN_STATUS); savedInterimTransport.closeWithError(SHUTDOWN_STATUS);
} }
} }
private static List<EquivalentAddressGroup> resolvedServerInfoGroupToEquivalentAddressGroup(
List<ResolvedServerInfoGroup> groupList) {
List<EquivalentAddressGroup> addrs = new ArrayList<EquivalentAddressGroup>(groupList.size());
for (ResolvedServerInfoGroup group : groupList) {
addrs.add(group.toEquivalentAddressGroup());
}
return addrs;
}
} }
} }

View File

@ -41,6 +41,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
@ -54,7 +55,6 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** Unit test for {@link PickFirstBalancerFactory}. */ /** Unit test for {@link PickFirstBalancerFactory}. */
@ -62,7 +62,7 @@ import java.util.List;
public class PickFirstBalancerTest { public class PickFirstBalancerTest {
private LoadBalancer<Transport> loadBalancer; private LoadBalancer<Transport> loadBalancer;
private List<List<ResolvedServerInfo>> servers; private List<ResolvedServerInfoGroup> servers;
private EquivalentAddressGroup addressGroup; private EquivalentAddressGroup addressGroup;
@Mock private TransportManager<Transport> mockTransportManager; @Mock private TransportManager<Transport> mockTransportManager;
@ -76,15 +76,15 @@ public class PickFirstBalancerTest {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
loadBalancer = PickFirstBalancerFactory.getInstance().newLoadBalancer( loadBalancer = PickFirstBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager); "fakeservice", mockTransportManager);
servers = new ArrayList<List<ResolvedServerInfo>>(); servers = Lists.newArrayList();
servers.add(new ArrayList<ResolvedServerInfo>()); List<ResolvedServerInfo> resolvedServerInfoList = Lists.newArrayList();
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i); resolvedServerInfoList.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i)));
servers.get(0).add(new ResolvedServerInfo(addr, Attributes.EMPTY));
addresses.add(addr);
} }
addressGroup = new EquivalentAddressGroup(addresses); ResolvedServerInfoGroup resolvedServerInfoGroup = ResolvedServerInfoGroup.builder().addAll(
resolvedServerInfoList).build();
servers.add(resolvedServerInfoGroup);
addressGroup = resolvedServerInfoGroup.toEquivalentAddressGroup();
when(mockTransportManager.getTransport(eq(addressGroup))).thenReturn(mockTransport); when(mockTransportManager.getTransport(eq(addressGroup))).thenReturn(mockTransport);
when(mockTransportManager.createInterimTransport()).thenReturn(mockInterimTransport); when(mockTransportManager.createInterimTransport()).thenReturn(mockInterimTransport);
when(mockInterimTransport.transport()).thenReturn(mockInterimTransportAsTransport); when(mockInterimTransport.transport()).thenReturn(mockInterimTransportAsTransport);

View File

@ -44,7 +44,7 @@ import com.google.common.collect.Iterables;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.internal.SharedResourceHolder.Resource;
@ -107,7 +107,7 @@ public class DnsNameResolverTest {
@Mock @Mock
private NameResolver.Listener mockListener; private NameResolver.Listener mockListener;
@Captor @Captor
private ArgumentCaptor<List<List<ResolvedServerInfo>>> resultCaptor; private ArgumentCaptor<List<ResolvedServerInfoGroup>> resultCaptor;
@Captor @Captor
private ArgumentCaptor<Status> statusCaptor; private ArgumentCaptor<Status> statusCaptor;
@ -290,10 +290,11 @@ public class DnsNameResolverTest {
} }
private static void assertAnswerMatches(InetAddress[] addrs, int port, private static void assertAnswerMatches(InetAddress[] addrs, int port,
List<ResolvedServerInfo> result) { ResolvedServerInfoGroup result) {
assertEquals(addrs.length, result.size()); assertEquals(addrs.length, result.getResolvedServerInfoList().size());
for (int i = 0; i < addrs.length; i++) { for (int i = 0; i < addrs.length; i++) {
InetSocketAddress socketAddr = (InetSocketAddress) result.get(i).getAddress(); InetSocketAddress socketAddr = (InetSocketAddress) result.getResolvedServerInfoList().get(
i).getAddress();
assertEquals("Addr " + i, port, socketAddr.getPort()); assertEquals("Addr " + i, port, socketAddr.getPort());
assertEquals("Addr " + i, addrs[i], socketAddr.getAddress()); assertEquals("Addr " + i, addrs[i], socketAddr.getAddress());
} }

View File

@ -44,6 +44,8 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ClientCall; import io.grpc.ClientCall;
@ -57,6 +59,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
import io.grpc.TransportManager.OobTransportProvider; import io.grpc.TransportManager.OobTransportProvider;
@ -103,7 +106,7 @@ public class ManagedChannelImplIdlenessTest {
MethodDescriptor.MethodType.UNKNOWN, "/service/method", MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller()); new StringMarshaller(), new IntegerMarshaller());
private final List<List<ResolvedServerInfo>> servers = new ArrayList<List<ResolvedServerInfo>>(); private final List<ResolvedServerInfoGroup> servers = Lists.newArrayList();
private final List<EquivalentAddressGroup> addressGroupList = private final List<EquivalentAddressGroup> addressGroupList =
new ArrayList<EquivalentAddressGroup>(); new ArrayList<EquivalentAddressGroup>();
@ -139,14 +142,13 @@ public class ManagedChannelImplIdlenessTest {
newTransports = TestUtils.captureTransports(mockTransportFactory); newTransports = TestUtils.captureTransports(mockTransportFactory);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
servers.add(new ArrayList<ResolvedServerInfo>()); ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder();
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int j = 0; j < 2; j++) { for (int j = 0; j < 2; j++) {
SocketAddress addr = new FakeSocketAddress("servergroup" + i + "server" + j); resolvedServerInfoGroup.add(
servers.get(i).add(new ResolvedServerInfo(addr, Attributes.EMPTY)); new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j)));
addresses.add(addr);
} }
addressGroupList.add(new EquivalentAddressGroup(addresses)); servers.add(resolvedServerInfoGroup.build());
addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup());
} }
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class)); verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
// Verify the initial idleness // Verify the initial idleness

View File

@ -53,6 +53,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallCredentials.MetadataApplier; import io.grpc.CallCredentials.MetadataApplier;
import io.grpc.CallCredentials; import io.grpc.CallCredentials;
@ -71,6 +73,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.PickFirstBalancerFactory; import io.grpc.PickFirstBalancerFactory;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.SecurityLevel; import io.grpc.SecurityLevel;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
@ -554,7 +557,7 @@ public class ManagedChannelImplTest {
String errorDescription = "NameResolver returned an empty list"; String errorDescription = "NameResolver returned an empty list";
// Name resolution is started as soon as channel is created // Name resolution is started as soon as channel is created
createChannel(new FakeNameResolverFactory(new ArrayList<ResolvedServerInfo>()), NO_INTERCEPTOR); createChannel(new FakeNameResolverFactory(), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); call.start(mockCallListener, new Metadata());
timer.runDueTasks(); timer.runDueTasks();
@ -587,7 +590,7 @@ public class ManagedChannelImplTest {
assertEquals(1, loadBalancerFactory.balancers.size()); assertEquals(1, loadBalancerFactory.balancers.size());
LoadBalancer<?> loadBalancer = loadBalancerFactory.balancers.get(0); LoadBalancer<?> loadBalancer = loadBalancerFactory.balancers.get(0);
doThrow(ex).when(loadBalancer).handleResolvedAddresses( doThrow(ex).when(loadBalancer).handleResolvedAddresses(
Matchers.<List<List<ResolvedServerInfo>>>anyObject(), any(Attributes.class)); Matchers.<List<ResolvedServerInfoGroup>>anyObject(), any(Attributes.class));
// NameResolver returns addresses. // NameResolver returns addresses.
nameResolverFactory.allResolved(); nameResolverFactory.allResolved();
@ -936,18 +939,24 @@ public class ManagedChannelImplTest {
} }
private class FakeNameResolverFactory extends NameResolver.Factory { private class FakeNameResolverFactory extends NameResolver.Factory {
final List<ResolvedServerInfo> servers; final List<ResolvedServerInfoGroup> servers;
final boolean resolvedAtStart; final boolean resolvedAtStart;
final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>(); final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
FakeNameResolverFactory(boolean resolvedAtStart) { FakeNameResolverFactory(boolean resolvedAtStart) {
this.resolvedAtStart = resolvedAtStart; this.resolvedAtStart = resolvedAtStart;
servers = Collections.singletonList(server); servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build());
} }
FakeNameResolverFactory(List<ResolvedServerInfo> servers) { FakeNameResolverFactory(List<ResolvedServerInfo> servers) {
resolvedAtStart = true; resolvedAtStart = true;
this.servers = servers; this.servers = Collections.singletonList(
ResolvedServerInfoGroup.builder().addAll(servers).build());
}
public FakeNameResolverFactory() {
resolvedAtStart = true;
this.servers = ImmutableList.of();
} }
@Override @Override
@ -988,7 +997,7 @@ public class ManagedChannelImplTest {
} }
void resolved() { void resolved() {
listener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY); listener.onUpdate(servers, Attributes.EMPTY);
} }
@Override public void shutdown() { @Override public void shutdown() {

View File

@ -41,12 +41,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
@ -63,7 +65,6 @@ import org.mockito.Mockito;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List; import java.util.List;
/** Unit test for {@link RoundRobinLoadBalancerFactory}. */ /** Unit test for {@link RoundRobinLoadBalancerFactory}. */
@ -71,7 +72,7 @@ import java.util.List;
public class RoundRobinLoadBalancerTest { public class RoundRobinLoadBalancerTest {
private LoadBalancer<Transport> loadBalancer; private LoadBalancer<Transport> loadBalancer;
private List<List<ResolvedServerInfo>> servers; private List<ResolvedServerInfoGroup> servers;
private List<EquivalentAddressGroup> addressGroupList; private List<EquivalentAddressGroup> addressGroupList;
@Mock private TransportManager<Transport> mockTransportManager; @Mock private TransportManager<Transport> mockTransportManager;
@ -87,17 +88,16 @@ public class RoundRobinLoadBalancerTest {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
loadBalancer = RoundRobinLoadBalancerFactory.getInstance().newLoadBalancer( loadBalancer = RoundRobinLoadBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager); "fakeservice", mockTransportManager);
addressGroupList = new ArrayList<EquivalentAddressGroup>(); addressGroupList = Lists.newArrayList();
servers = new ArrayList<List<ResolvedServerInfo>>(); servers = Lists.newArrayList();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
servers.add(new ArrayList<ResolvedServerInfo>()); ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder();
ArrayList<SocketAddress> addresses = new ArrayList<SocketAddress>();
for (int j = 0; j < 3; j++) { for (int j = 0; j < 3; j++) {
SocketAddress addr = new FakeSocketAddress("servergroup" + i + "server" + j); resolvedServerInfoGroup.add(
servers.get(i).add(new ResolvedServerInfo(addr, Attributes.EMPTY)); new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j)));
addresses.add(addr);
} }
addressGroupList.add(new EquivalentAddressGroup(addresses)); servers.add(resolvedServerInfoGroup.build());
addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup());
} }
when(mockTransportManager.getTransport(eq(addressGroupList.get(0)))) when(mockTransportManager.getTransport(eq(addressGroupList.get(0))))
.thenReturn(mockTransport0); .thenReturn(mockTransport0);

View File

@ -41,6 +41,7 @@ import io.grpc.Channel;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager; import io.grpc.TransportManager;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
@ -150,19 +151,14 @@ class GrpclbLoadBalancer<T> extends LoadBalancer<T> {
} }
@Override @Override
public void handleResolvedAddresses( public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
List<? extends List<ResolvedServerInfo>> updatedServers, Attributes config) { Attributes attributes) {
synchronized (lock) { synchronized (lock) {
if (closed) { if (closed) {
return; return;
} }
ArrayList<SocketAddress> addrs = new ArrayList<SocketAddress>(updatedServers.size()); EquivalentAddressGroup newLbAddresses = resolvedServerInfoGroupsToEquivalentAddressGroup(
for (List<ResolvedServerInfo> serverInfos : updatedServers) { updatedServers);
for (ResolvedServerInfo serverInfo : serverInfos) {
addrs.add(serverInfo.getAddress());
}
}
EquivalentAddressGroup newLbAddresses = new EquivalentAddressGroup(addrs);
if (!newLbAddresses.equals(lbAddresses)) { if (!newLbAddresses.equals(lbAddresses)) {
lbAddresses = newLbAddresses; lbAddresses = newLbAddresses;
connectToLb(); connectToLb();
@ -274,6 +270,20 @@ class GrpclbLoadBalancer<T> extends LoadBalancer<T> {
tm.updateRetainedTransports(addresses); tm.updateRetainedTransports(addresses);
} }
/**
* Converts list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup resolvedServerInfoGroupsToEquivalentAddressGroup(
List<ResolvedServerInfoGroup> groupList) {
List<SocketAddress> addrs = new ArrayList<SocketAddress>(groupList.size());
for (ResolvedServerInfoGroup group : groupList) {
for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
addrs.add(srv.getAddress());
}
}
return new EquivalentAddressGroup(addrs);
}
private class LbResponseObserver implements StreamObserver<LoadBalanceResponse> { private class LbResponseObserver implements StreamObserver<LoadBalanceResponse> {
@Override public void onNext(LoadBalanceResponse response) { @Override public void onNext(LoadBalanceResponse response) {
logger.info("Got a LB response: " + response); logger.info("Got a LB response: " + response);
@ -286,12 +296,12 @@ class GrpclbLoadBalancer<T> extends LoadBalancer<T> {
// TODO(zhangkun83): honor expiration_interval // TODO(zhangkun83): honor expiration_interval
for (Server server : serverList.getServersList()) { for (Server server : serverList.getServersList()) {
if (server.getDropRequest()) { if (server.getDropRequest()) {
listBuilder.add(null); listBuilder.addSocketAddress(null);
} else { } else {
try { try {
InetSocketAddress address = new InetSocketAddress( InetSocketAddress address = new InetSocketAddress(
InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort()); InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
listBuilder.add(address); listBuilder.addSocketAddress(address);
// TODO(zhangkun83): fill the LB token to the attributes, and insert it to the // TODO(zhangkun83): fill the LB token to the attributes, and insert it to the
// application RPCs. // application RPCs.
if (!newServerMap.containsKey(address)) { if (!newServerMap.containsKey(address)) {

View File

@ -51,6 +51,7 @@ import com.google.protobuf.ByteString;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.ResolvedServerInfo; import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.TransportManager.InterimTransport; import io.grpc.TransportManager.InterimTransport;
import io.grpc.TransportManager; import io.grpc.TransportManager;
@ -431,7 +432,8 @@ public class GrpclbLoadBalancerTest {
Transport lbTransport = new Transport(); Transport lbTransport = new Transport();
when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport); when(mockTransportManager.getTransport(eq(lbAddressGroup))).thenReturn(lbTransport);
loadBalancer.handleResolvedAddresses( loadBalancer.handleResolvedAddresses(
Collections.singletonList(Collections.singletonList(lbServerInfo)), Attributes.EMPTY); Collections.singletonList(ResolvedServerInfoGroup.builder().add(lbServerInfo).build()),
Attributes.EMPTY);
verify(mockTransportManager).getTransport(eq(lbAddressGroup)); verify(mockTransportManager).getTransport(eq(lbAddressGroup));
return lbTransport; return lbTransport;
} }