core: unify EquivalentAddressGroup and its immitators. (#2755)

Resolves #2716

- Add attributes to EquivalentAddressGroup
- Deprecate ResolvedServerInfoGroup by EquivalentAddressGroup
- Deprecate ResolvedServerInfo, because attributes for a single address
  with an address group is not found to be useful.
- The changes on the NameResolver and LoadBalancer interfaces are backward-compatible
  in the next release, with which implementors can switch to the new API smoothly.

As a related change, redefine the semantics of DnsNameResolver and
RoundRobinLoadBalancer:

- Before: DnsNameResolver returns all addresses in one address group.
  RoundRobinLoadBalancer ignores the grouping of addresses and
  round-robin on every single addresses.  It doesn't work well with the
  one-server-multiple-address setup, e.g., both IPv4 and IPv6 addresses
  are returned for a single serve, even if they are put in the same
  address group by the NameResolver.

- After: DnsNameResolver returns every address in its own
  EAG. RoundRobinLoadBalancer takes an EAG as a whole, and only
  round-robin on the list of EAGs. The new behavior is a better
  interpretation of the EAGs, and really allows the case where one
  server has more than one addresses (e.g., IPv4 and IPv6).

This change will affect users that use custom LoadBalancer with the
stock DnsNameResolver, and those who use custom NameResolver with the
stock RoundRobinLoadBalancer.

Users who use both the stock DnsNameResolver and RoundRobinLoadBalancer
or PickFirstBalancer will see no behavioral change. Because they will
still round-robin on individual addresses from DNS, or do pick-first on
all addresses from DNS (PickFirstBalancer flattens all addresses).

The result is a simpler API and reduction of boilderplates.
This commit is contained in:
Kun Zhang 2017-03-22 18:29:31 -07:00 committed by GitHub
parent 3ffa5a9660
commit 418d52d16d
18 changed files with 268 additions and 357 deletions

View File

@ -48,6 +48,7 @@ import java.util.List;
public final class EquivalentAddressGroup {
private final List<SocketAddress> addrs;
private final Attributes attrs;
/**
* {@link SocketAddress} docs say that the addresses are immutable, so we cache the hashCode.
@ -55,20 +56,36 @@ public final class EquivalentAddressGroup {
private final int hashCode;
/**
* List constructor.
* List constructor without {@link Attributes}.
*/
public EquivalentAddressGroup(List<SocketAddress> addrs) {
this(addrs, Attributes.EMPTY);
}
/**
* List constructor with {@link Attributes}.
*/
public EquivalentAddressGroup(List<SocketAddress> addrs, Attributes attrs) {
Preconditions.checkArgument(!addrs.isEmpty(), "addrs is empty");
this.addrs = Collections.unmodifiableList(new ArrayList<SocketAddress>(addrs));
this.attrs = Preconditions.checkNotNull(attrs, "attrs");
// Attributes may contain mutable objects, which means Attributes' hashCode may change over
// time, thus we don't cache Attributes' hashCode.
hashCode = this.addrs.hashCode();
}
/**
* Singleton constructor.
* Singleton constructor without Attributes.
*/
public EquivalentAddressGroup(SocketAddress addr) {
this.addrs = Collections.singletonList(addr);
hashCode = addrs.hashCode();
this(addr, Attributes.EMPTY);
}
/**
* Singleton constructor with Attributes.
*/
public EquivalentAddressGroup(SocketAddress addr, Attributes attrs) {
this(Collections.singletonList(addr), attrs);
}
/**
@ -78,9 +95,16 @@ public final class EquivalentAddressGroup {
return addrs;
}
/**
* Returns the attributes.
*/
public Attributes getAttributes() {
return attrs;
}
@Override
public String toString() {
return addrs.toString();
return "[addrs=" + addrs + ", attrs=" + attrs + "]";
}
@Override
@ -89,6 +113,14 @@ public final class EquivalentAddressGroup {
return hashCode;
}
/**
* Returns true if the given object is also an {@link EquivalentAddressGroup} with an equal
* address list and equal attribute values.
*
* <p>Note that if the attributes include mutable values, it is possible for two objects to be
* considered equal at one point in time and not equal at another (due to concurrent mutation of
* attribute values).
*/
@Override
public boolean equals(Object other) {
if (!(other instanceof EquivalentAddressGroup)) {
@ -104,6 +136,9 @@ public final class EquivalentAddressGroup {
return false;
}
}
if (!attrs.equals(that.attrs)) {
return false;
}
return true;
}
}

View File

@ -34,6 +34,8 @@ package io.grpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
@ -117,11 +119,44 @@ public abstract class LoadBalancer {
*
* <p>Implementations should not modify the given {@code servers}.
*
* @deprecated Implement {@link #handleResolvedAddressGroups} instead. As it is deprecated, the
* {@link ResolvedServerInfo}s from the passed-in {@link ResolvedServerInfoGroup}s
* lose all their attributes.
*
* @param servers the resolved server addresses, never empty.
* @param attributes extra metadata from naming system.
*/
public abstract void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes);
@Deprecated
public void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes) {
throw new UnsupportedOperationException("This is deprecated and should not be called");
}
/**
* Handles newly resolved server groups and metadata attributes from name resolution system.
* {@code servers} contained in {@link EquivalentAddressGroup} should be considered equivalent
* but may be flattened into a single list if needed.
*
* <p>Implementations should not modify the given {@code servers}.
*
* @param servers the resolved server addresses, never empty.
* @param attributes extra metadata from naming system.
*/
@SuppressWarnings("deprecation")
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
ArrayList<ResolvedServerInfoGroup> serverInfoGroups =
new ArrayList<ResolvedServerInfoGroup>(servers.size());
for (EquivalentAddressGroup eag : servers) {
ResolvedServerInfoGroup.Builder serverInfoGroupBuilder =
ResolvedServerInfoGroup.builder(eag.getAttributes());
for (SocketAddress addr : eag.getAddresses()) {
serverInfoGroupBuilder.add(new ResolvedServerInfo(addr));
}
serverInfoGroups.add(serverInfoGroupBuilder.build());
}
handleResolvedAddresses(serverInfoGroups, attributes);
}
/**
* Handles an error from the name resolution system.

View File

@ -123,12 +123,24 @@ public abstract class NameResolver {
*
* <p>Implementations will not modify the given {@code servers}.
*
* @deprecated call {@link #onAddresses} instead
* @param servers the resolved server groups, containing {@link ResolvedServerInfo} objects. An
* empty list will trigger {@link #onError}
* @param attributes extra metadata from naming system
*/
@Deprecated
void onUpdate(List<ResolvedServerInfoGroup> servers, Attributes attributes);
/**
* Handles updates on resolved addresses and attributes.
*
* <p>Implementations will not modify the given {@code servers}.
*
* @param servers the resolved server addresses. An empty list will trigger {@link #onError}
* @param attributes extra metadata from naming system
*/
void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes);
/**
* Handles an error from the resolver.
*

View File

@ -74,13 +74,12 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
}
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes) {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
// Flatten servers list received from name resolver into single address group. This means that
// as far as load balancer is concerned, there's virtually one single server with multiple
// addresses so the connection will be created only for the first address (pick first).
EquivalentAddressGroup newEag =
flattenResolvedServerInfoGroupsIntoEquivalentAddressGroup(servers);
EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers);
if (subchannel == null || !newEag.equals(subchannel.getAddresses())) {
if (subchannel != null) {
subchannel.shutdown();
@ -136,19 +135,18 @@ public final class PickFirstBalancerFactory extends LoadBalancer.Factory {
}
/**
* Flattens list of ResolvedServerInfoGroup objects into one EquivalentAddressGroup object.
* Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup flattenResolvedServerInfoGroupsIntoEquivalentAddressGroup(
List<ResolvedServerInfoGroup> groupList) {
private static EquivalentAddressGroup flattenEquivalentAddressGroup(
List<EquivalentAddressGroup> groupList) {
List<SocketAddress> addrs = new ArrayList<SocketAddress>();
for (ResolvedServerInfoGroup group : groupList) {
for (ResolvedServerInfo srv : group.getResolvedServerInfoList()) {
addrs.add(srv.getAddress());
for (EquivalentAddressGroup group : groupList) {
for (SocketAddress addr : group.getAddresses()) {
addrs.add(addr);
}
}
return new EquivalentAddressGroup(addrs);
}
}
/**

View File

@ -39,7 +39,10 @@ import javax.annotation.concurrent.Immutable;
/**
* The information about a server from a {@link NameResolver}.
*
* @deprecated This class will be removed along with {@link ResolvedServerInfoGroup}.
*/
@Deprecated
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@Immutable
public final class ResolvedServerInfo {

View File

@ -44,7 +44,10 @@ import javax.annotation.concurrent.Immutable;
/**
* A group of {@link ResolvedServerInfo}s that is returned from a {@link NameResolver}.
*
* @deprecated This class will be removed. Use {@link EquivalentAddressGroup} instead.
*/
@Deprecated
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1770")
@Immutable
public final class ResolvedServerInfoGroup {
@ -88,7 +91,7 @@ public final class ResolvedServerInfoGroup {
for (ResolvedServerInfo resolvedServerInfo : resolvedServerInfoList) {
addrs.add(resolvedServerInfo.getAddress());
}
return new EquivalentAddressGroup(addrs);
return new EquivalentAddressGroup(addrs, attributes);
}
/**

View File

@ -43,14 +43,13 @@ import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import io.grpc.PickFirstBalancerFactory;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@ -373,8 +372,8 @@ public abstract class AbstractManagedChannelImplBuilder
@Override
public void start(final Listener listener) {
listener.onUpdate(Collections.singletonList(
ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(address)).build()),
listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(address)),
Attributes.EMPTY);
}

View File

@ -34,15 +34,15 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -54,6 +54,9 @@ import javax.annotation.concurrent.GuardedBy;
/**
* A DNS-based {@link NameResolver}.
*
* <p>Each {@code A} or {@code AAAA} record emits an {@link EquivalentAddressGroup} in the list
* passed to {@link NameResolver.Listener#onUpdate}
*
* @see DnsNameResolverFactory
*/
class DnsNameResolver extends NameResolver {
@ -141,11 +144,9 @@ class DnsNameResolver extends NameResolver {
}
try {
if (System.getenv("GRPC_PROXY_EXP") != null) {
ResolvedServerInfoGroup servers = ResolvedServerInfoGroup.builder()
.add(new ResolvedServerInfo(
InetSocketAddress.createUnresolved(host, port), Attributes.EMPTY))
.build();
savedListener.onUpdate(Collections.singletonList(servers), Attributes.EMPTY);
EquivalentAddressGroup server =
new EquivalentAddressGroup(InetSocketAddress.createUnresolved(host, port));
savedListener.onAddresses(Collections.singletonList(server), Attributes.EMPTY);
return;
}
@ -165,13 +166,13 @@ class DnsNameResolver extends NameResolver {
savedListener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
ResolvedServerInfoGroup.Builder servers = ResolvedServerInfoGroup.builder();
// Each address forms an EAG
ArrayList<EquivalentAddressGroup> servers = new ArrayList<EquivalentAddressGroup>();
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
}
savedListener.onUpdate(Collections.singletonList(servers.build()), Attributes.EMPTY);
savedListener.onAddresses(servers, Attributes.EMPTY);
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;

View File

@ -58,11 +58,11 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -741,8 +741,20 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
this.helper = helperImpl;
}
@Deprecated
@Override
public void onUpdate(final List<ResolvedServerInfoGroup> servers, final Attributes config) {
public void onUpdate(
final List<io.grpc.ResolvedServerInfoGroup> servers, final Attributes config) {
ArrayList<EquivalentAddressGroup> eags =
new ArrayList<EquivalentAddressGroup>(servers.size());
for (io.grpc.ResolvedServerInfoGroup infoGroup : servers) {
eags.add(infoGroup.toEquivalentAddressGroup());
}
onAddresses(eags, config);
}
@Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
if (servers.isEmpty()) {
onError(Status.UNAVAILABLE.withDescription("NameResolver returned an empty list"));
return;
@ -756,7 +768,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
return;
}
try {
balancer.handleResolvedAddresses(servers, config);
balancer.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) {
log.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);

View File

@ -47,8 +47,6 @@ import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
@ -101,11 +99,10 @@ public class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
}
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfoGroup> servers, Attributes attributes) {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Set<EquivalentAddressGroup> latestAddrs =
resolvedServerInfoGroupToEquivalentAddressGroup(servers);
Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs);
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs);
@ -184,15 +181,13 @@ public class RoundRobinLoadBalancerFactory extends LoadBalancer.Factory {
}
/**
* Converts list of {@link ResolvedServerInfoGroup} to {@link EquivalentAddressGroup} set.
* Converts list of {@link EquivalentAddressGroup} to {@link EquivalentAddressGroup} set and
* remove all attributes.
*/
private static Set<EquivalentAddressGroup> resolvedServerInfoGroupToEquivalentAddressGroup(
List<ResolvedServerInfoGroup> groupList) {
private static Set<EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) {
Set<EquivalentAddressGroup> addrs = new HashSet<EquivalentAddressGroup>();
for (ResolvedServerInfoGroup group : groupList) {
for (ResolvedServerInfo server : group.getResolvedServerInfoList()) {
addrs.add(new EquivalentAddressGroup(server.getAddress()));
}
for (EquivalentAddressGroup group : groupList) {
addrs.add(new EquivalentAddressGroup(group.getAddresses()));
}
return addrs;
}

View File

@ -68,7 +68,7 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class PickFirstLoadBalancerTest {
private PickFirstBalancer loadBalancer;
private List<ResolvedServerInfoGroup> servers = Lists.newArrayList();
private List<EquivalentAddressGroup> servers = Lists.newArrayList();
private List<SocketAddress> socketAddresses = Lists.newArrayList();
private static final Attributes.Key<String> FOO = Attributes.Key.of("foo");
@ -92,7 +92,7 @@ public class PickFirstLoadBalancerTest {
MockitoAnnotations.initMocks(this);
for (int i = 0; i < 3; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
servers.add(ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(addr)).build());
servers.add(new EquivalentAddressGroup(addr));
socketAddresses.add(addr);
}
@ -111,7 +111,7 @@ public class PickFirstLoadBalancerTest {
@Test
public void pickAfterResolved() throws Exception {
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture());
verify(mockHelper).updatePicker(pickerCaptor.capture());
@ -125,8 +125,8 @@ public class PickFirstLoadBalancerTest {
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class),
any(Attributes.class));
@ -139,8 +139,8 @@ public class PickFirstLoadBalancerTest {
public void pickAfterResolvedAndChanged() throws Exception {
SocketAddress socketAddr = new FakeSocketAddress("newserver");
List<SocketAddress> newSocketAddresses = Lists.newArrayList(socketAddr);
List<ResolvedServerInfoGroup> newServers = Lists.newArrayList(
ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(socketAddr)).build());
List<EquivalentAddressGroup> newServers =
Lists.newArrayList(new EquivalentAddressGroup(socketAddr));
final Subchannel oldSubchannel = mock(Subchannel.class);
final EquivalentAddressGroup oldEag = new EquivalentAddressGroup(socketAddresses);
@ -155,12 +155,12 @@ public class PickFirstLoadBalancerTest {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
assertEquals(socketAddresses, eagCaptor.getValue().getAddresses());
loadBalancer.handleResolvedAddresses(newServers, affinity);
loadBalancer.handleResolvedAddressGroups(newServers, affinity);
inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses());
@ -187,7 +187,7 @@ public class PickFirstLoadBalancerTest {
@Test
public void pickAfterStateChangeAfterResolution() throws Exception {
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockHelper).updatePicker(pickerCaptor.capture());
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
reset(mockHelper);
@ -232,7 +232,7 @@ public class PickFirstLoadBalancerTest {
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
inOrder.verify(mockHelper).updatePicker(any(Picker.class));
loadBalancer.handleResolvedAddresses(servers, affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)),
eq(Attributes.EMPTY));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());

View File

@ -1,131 +0,0 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.fail;
import java.net.InetSocketAddress;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ResolvedServerInfoTest {
private static final Attributes.Key<String> FOO = Attributes.Key.of("foo");
private static final Attributes ATTRS = Attributes.newBuilder().set(FOO, "bar").build();
@Test
public void accessors() {
InetSocketAddress addr = InetSocketAddress.createUnresolved("foo", 123);
ResolvedServerInfo server = new ResolvedServerInfo(addr, ATTRS);
assertEquals(addr, server.getAddress());
assertEquals(ATTRS, server.getAttributes());
// unspecified attributes treated as empty
server = new ResolvedServerInfo(addr);
assertEquals(addr, server.getAddress());
assertEquals(Attributes.EMPTY, server.getAttributes());
}
@Test public void cannotUseNullAddress() {
try {
new ResolvedServerInfo(null, ATTRS);
fail("Should not have been allowd to create info with null address");
} catch (NullPointerException e) {
// expected
}
}
@Test public void equals_true() {
ResolvedServerInfo server1 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123),
Attributes.newBuilder().set(FOO, "bar").build());
ResolvedServerInfo server2 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123),
Attributes.newBuilder().set(FOO, "bar").build());
// sanity checks that they're not same instances
assertNotSame(server1.getAddress(), server2.getAddress());
assertNotSame(server1.getAttributes(), server2.getAttributes());
assertEquals(server1, server2);
assertEquals(server1.hashCode(), server2.hashCode()); // hash code must be consistent
// empty attributes
server1 = new ResolvedServerInfo(InetSocketAddress.createUnresolved("foo", 123));
server2 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123), Attributes.EMPTY);
assertEquals(server1, server2);
assertEquals(server1.hashCode(), server2.hashCode());
}
@Test public void equals_falseDifferentAddresses() {
ResolvedServerInfo server1 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123),
Attributes.newBuilder().set(FOO, "bar").build());
ResolvedServerInfo server2 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 456),
Attributes.newBuilder().set(FOO, "bar").build());
assertNotEquals(server1, server2);
// hash code could collide, but this assertion is safe because, in this example, they do not
assertNotEquals(server1.hashCode(), server2.hashCode());
}
@Test public void equals_falseDifferentAttributes() {
ResolvedServerInfo server1 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123),
Attributes.newBuilder().set(FOO, "bar").build());
ResolvedServerInfo server2 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 123),
Attributes.newBuilder().set(FOO, "baz").build());
assertNotEquals(server1, server2);
// hash code could collide, but these assertions are safe because, in these examples, they don't
assertNotEquals(server1.hashCode(), server2.hashCode());
// same values but extra key? still not equal
server2 = new ResolvedServerInfo(
InetSocketAddress.createUnresolved("foo", 456),
Attributes.newBuilder()
.set(FOO, "bar")
.set(Attributes.Key.of("fiz"), "buz")
.build());
assertNotEquals(server1, server2);
assertNotEquals(server1.hashCode(), server2.hashCode());
}
}

View File

@ -42,8 +42,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.net.InetAddress;
@ -104,7 +104,7 @@ public class DnsNameResolverTest {
@Mock
private NameResolver.Listener mockListener;
@Captor
private ArgumentCaptor<List<ResolvedServerInfoGroup>> resultCaptor;
private ArgumentCaptor<List<EquivalentAddressGroup>> resultCaptor;
@Captor
private ArgumentCaptor<Status> statusCaptor;
@ -149,16 +149,16 @@ public class DnsNameResolverTest {
MockResolver resolver = new MockResolver(name, 81, answer1, answer2);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer1, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertAnswerMatches(answer1, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
resolver.refresh();
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener, times(2)).onUpdate(resultCaptor.capture(), any(Attributes.class));
verify(mockListener, times(2)).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer2, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertAnswerMatches(answer2, 81, resultCaptor.getValue());
assertEquals(0, fakeClock.numPendingTasks());
resolver.shutdown();
@ -201,9 +201,9 @@ public class DnsNameResolverTest {
fakeClock.forwardNanos(1);
assertEquals(0, fakeClock.numPendingTasks());
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertAnswerMatches(answer, 81, resultCaptor.getValue());
verifyNoMoreInteractions(mockListener);
}
@ -229,9 +229,9 @@ public class DnsNameResolverTest {
assertEquals(1, fakeExecutor.runDueTasks());
// Refresh cancelled the retry
assertEquals(0, fakeClock.numPendingTasks());
verify(mockListener).onUpdate(resultCaptor.capture(), any(Attributes.class));
verify(mockListener).onAddresses(resultCaptor.capture(), any(Attributes.class));
assertEquals(name, resolver.invocations.poll());
assertAnswerMatches(answer, 81, Iterables.getOnlyElement(resultCaptor.getValue()));
assertAnswerMatches(answer, 81, resultCaptor.getValue());
verifyNoMoreInteractions(mockListener);
}
@ -286,12 +286,13 @@ public class DnsNameResolverTest {
return list;
}
private static void assertAnswerMatches(InetAddress[] addrs, int port,
ResolvedServerInfoGroup result) {
assertEquals(addrs.length, result.getResolvedServerInfoList().size());
private static void assertAnswerMatches(
InetAddress[] addrs, int port, List<EquivalentAddressGroup> results) {
assertEquals(addrs.length, results.size());
for (int i = 0; i < addrs.length; i++) {
InetSocketAddress socketAddr = (InetSocketAddress) result.getResolvedServerInfoList().get(
i).getAddress();
EquivalentAddressGroup addrGroup = results.get(i);
InetSocketAddress socketAddr =
(InetSocketAddress) Iterables.getOnlyElement(addrGroup.getAddresses());
assertEquals("Addr " + i, port, socketAddr.getPort());
assertEquals("Addr " + i, addrs[i], socketAddr.getAddress());
}

View File

@ -63,8 +63,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
@ -108,9 +106,7 @@ public class ManagedChannelImplIdlenessTest {
.setResponseMarshaller(new IntegerMarshaller())
.build();
private final List<ResolvedServerInfoGroup> servers = Lists.newArrayList();
private final List<EquivalentAddressGroup> addressGroupList =
new ArrayList<EquivalentAddressGroup>();
private final List<EquivalentAddressGroup> servers = Lists.newArrayList();
@Mock private ObjectPool<ScheduledExecutorService> timerServicePool;
@Mock private ObjectPool<Executor> executorPool;
@ -147,13 +143,11 @@ public class ManagedChannelImplIdlenessTest {
newTransports = TestUtils.captureTransports(mockTransportFactory);
for (int i = 0; i < 2; i++) {
ResolvedServerInfoGroup.Builder resolvedServerInfoGroup = ResolvedServerInfoGroup.builder();
ArrayList<SocketAddress> addrs = Lists.newArrayList();
for (int j = 0; j < 2; j++) {
resolvedServerInfoGroup.add(
new ResolvedServerInfo(new FakeSocketAddress("servergroup" + i + "server" + j)));
addrs.add(new FakeSocketAddress("servergroup" + i + "server" + j));
}
servers.add(resolvedServerInfoGroup.build());
addressGroupList.add(resolvedServerInfoGroup.build().toEquivalentAddressGroup());
servers.add(new EquivalentAddressGroup(addrs));
}
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
// Verify the initial idleness
@ -179,8 +173,8 @@ public class ManagedChannelImplIdlenessTest {
verify(mockNameResolver).start(nameResolverListenerCaptor.capture());
// Simulate new address resolved to make sure the LoadBalancer is correctly linked to
// the NameResolver.
nameResolverListenerCaptor.getValue().onUpdate(servers, Attributes.EMPTY);
verify(mockLoadBalancer).handleResolvedAddresses(servers, Attributes.EMPTY);
nameResolverListenerCaptor.getValue().onAddresses(servers, Attributes.EMPTY);
verify(mockLoadBalancer).handleResolvedAddressGroups(servers, Attributes.EMPTY);
}
@Test
@ -245,7 +239,7 @@ public class ManagedChannelImplIdlenessTest {
@Test
public void realTransportsHoldsOffIdleness() throws Exception {
final EquivalentAddressGroup addressGroup = addressGroupList.get(1);
final EquivalentAddressGroup addressGroup = servers.get(1);
// Start a call, which goes to delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
@ -313,7 +307,7 @@ public class ManagedChannelImplIdlenessTest {
assertFalse(channel.inUseStateAggregator.isInUse());
// Now make an RPC on an OOB channel
ManagedChannel oob = helper.createOobChannel(addressGroupList.get(0), "oobauthority");
ManagedChannel oob = helper.createOobChannel(servers.get(0), "oobauthority");
verify(mockTransportFactory, never())
.newClientTransport(any(SocketAddress.class), same("oobauthority"), same(USER_AGENT));
ClientCall<String, Integer> oobCall = oob.newCall(method, CallOptions.DEFAULT);

View File

@ -82,8 +82,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.SecurityLevel;
import io.grpc.Status;
import io.grpc.StringMarshaller;
@ -140,7 +138,6 @@ public class ManagedChannelImplTest {
private URI expectedUri;
private final SocketAddress socketAddress = new SocketAddress() {};
private final EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock();
private final FakeClock oobExecutor = new FakeClock();
@ -515,8 +512,8 @@ public class ManagedChannelImplTest {
createChannel(nameResolverFactory, NO_INTERCEPTOR);
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(
Matchers.<List<ResolvedServerInfoGroup>>anyObject(), any(Attributes.class));
doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups(
Matchers.<List<EquivalentAddressGroup>>anyObject(), any(Attributes.class));
// NameResolver returns addresses.
nameResolverFactory.allResolved();
@ -561,16 +558,10 @@ public class ManagedChannelImplTest {
return "badAddress";
}
};
final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
InOrder inOrder = inOrder(mockLoadBalancer);
ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
.add(badServer)
.add(goodServer)
.build();
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
List<SocketAddress> resolvedAddrs = Arrays.asList(badAddress, goodAddress);
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(resolvedAddrs);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
// Start the call
@ -580,10 +571,10 @@ public class ManagedChannelImplTest {
executor.runDueTasks();
// Simulate name resolution results
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
Subchannel subchannel = helper.createSubchannel(
serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
subchannel.requestConnection();
@ -644,17 +635,11 @@ public class ManagedChannelImplTest {
return "addr2";
}
};
final ResolvedServerInfo server1 = new ResolvedServerInfo(addr1, Attributes.EMPTY);
final ResolvedServerInfo server2 = new ResolvedServerInfo(addr2, Attributes.EMPTY);
InOrder inOrder = inOrder(mockLoadBalancer);
ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
.add(server1)
.add(server2)
.build();
List<SocketAddress> resolvedAddrs = Arrays.asList(addr1, addr2);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory(serverInfoGroup.getResolvedServerInfoList());
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(resolvedAddrs);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
// Start a wait-for-ready call
@ -669,10 +654,10 @@ public class ManagedChannelImplTest {
executor.runDueTasks();
// Simulate name resolution results
inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
eq(Arrays.asList(serverInfoGroup)), eq(Attributes.EMPTY));
Subchannel subchannel = helper.createSubchannel(
serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups(
eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY));
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
subchannel.requestConnection();
@ -1005,8 +990,6 @@ public class ManagedChannelImplTest {
*/
@Test
public void informationPropagatedToNewStreamAndCallCredentials() {
ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup.builder()
.add(server).build();
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
CallOptions callOptions = CallOptions.DEFAULT.withCallCredentials(creds);
final Context.Key<String> testKey = Context.key("testing");
@ -1034,8 +1017,8 @@ public class ManagedChannelImplTest {
call.start(mockCallListener, new Metadata());
// Simulate name resolution results
Subchannel subchannel = helper.createSubchannel(
serverInfoGroup.toEquivalentAddressGroup(), Attributes.EMPTY);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(socketAddress);
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
verify(mockTransportFactory).newClientTransport(
same(socketAddress), eq(authority), eq(userAgent));
@ -1122,19 +1105,18 @@ public class ManagedChannelImplTest {
}
private class FakeNameResolverFactory extends NameResolver.Factory {
final List<ResolvedServerInfoGroup> servers;
final List<EquivalentAddressGroup> servers;
final boolean resolvedAtStart;
final ArrayList<FakeNameResolver> resolvers = new ArrayList<FakeNameResolver>();
FakeNameResolverFactory(boolean resolvedAtStart) {
this.resolvedAtStart = resolvedAtStart;
servers = Collections.singletonList(ResolvedServerInfoGroup.builder().add(server).build());
servers = Collections.singletonList(new EquivalentAddressGroup(socketAddress));
}
FakeNameResolverFactory(List<ResolvedServerInfo> servers) {
FakeNameResolverFactory(List<SocketAddress> servers) {
resolvedAtStart = true;
this.servers = Collections.singletonList(
ResolvedServerInfoGroup.builder().addAll(servers).build());
this.servers = Collections.singletonList(new EquivalentAddressGroup(servers));
}
public FakeNameResolverFactory() {
@ -1180,7 +1162,7 @@ public class ManagedChannelImplTest {
}
void resolved() {
listener.onUpdate(servers, Attributes.EMPTY);
listener.onAddresses(servers, Attributes.EMPTY);
}
@Override public void shutdown() {

View File

@ -58,8 +58,6 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.util.RoundRobinLoadBalancerFactory.Picker;
import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer;
@ -86,7 +84,7 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class RoundRobinLoadBalancerTest {
private RoundRobinLoadBalancer loadBalancer;
private Map<ResolvedServerInfoGroup, EquivalentAddressGroup> servers = Maps.newHashMap();
private List<EquivalentAddressGroup> servers = Lists.newArrayList();
private Map<EquivalentAddressGroup, Subchannel> subchannels = Maps.newLinkedHashMap();
private static final Attributes.Key<String> MAJOR_KEY = Attributes.Key.of("major-key");
private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build();
@ -109,7 +107,7 @@ public class RoundRobinLoadBalancerTest {
for (int i = 0; i < 3; i++) {
SocketAddress addr = new FakeSocketAddress("server" + i);
EquivalentAddressGroup eag = new EquivalentAddressGroup(addr);
servers.put(ResolvedServerInfoGroup.builder().add(new ResolvedServerInfo(addr)).build(), eag);
servers.add(eag);
subchannels.put(eag, mock(Subchannel.class));
}
@ -136,7 +134,7 @@ public class RoundRobinLoadBalancerTest {
@Test
public void pickAfterResolved() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
verify(mockHelper, times(3)).createSubchannel(eagCaptor.capture(),
@ -176,11 +174,10 @@ public class RoundRobinLoadBalancerTest {
subchannels2.put(new EquivalentAddressGroup(removedAddr), removedSubchannel);
subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
List<ResolvedServerInfoGroup> currentServers = Lists.newArrayList(
ResolvedServerInfoGroup.builder()
.add(new ResolvedServerInfo(removedAddr))
.add(new ResolvedServerInfo(oldAddr))
.build());
List<EquivalentAddressGroup> currentServers =
Lists.newArrayList(
new EquivalentAddressGroup(removedAddr),
new EquivalentAddressGroup(oldAddr));
doAnswer(new Answer<Subchannel>() {
@Override
@ -190,7 +187,7 @@ public class RoundRobinLoadBalancerTest {
}
}).when(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class));
loadBalancer.handleResolvedAddresses(currentServers, affinity);
loadBalancer.handleResolvedAddressGroups(currentServers, affinity);
InOrder inOrder = inOrder(mockHelper);
@ -209,13 +206,12 @@ public class RoundRobinLoadBalancerTest {
subchannels2.put(new EquivalentAddressGroup(oldAddr), oldSubchannel);
subchannels2.put(new EquivalentAddressGroup(newAddr), newSubchannel);
List<ResolvedServerInfoGroup> latestServers = Lists.newArrayList(
ResolvedServerInfoGroup.builder()
.add(new ResolvedServerInfo(oldAddr))
.add(new ResolvedServerInfo(newAddr))
.build());
List<EquivalentAddressGroup> latestServers =
Lists.newArrayList(
new EquivalentAddressGroup(oldAddr),
new EquivalentAddressGroup(newAddr));
loadBalancer.handleResolvedAddresses(latestServers, affinity);
loadBalancer.handleResolvedAddressGroups(latestServers, affinity);
verify(newSubchannel, times(1)).requestConnection();
verify(removedSubchannel, times(1)).shutdown();
@ -253,7 +249,7 @@ public class RoundRobinLoadBalancerTest {
@Test
public void pickAfterStateChange() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), Attributes.EMPTY);
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY);
Subchannel subchannel = loadBalancer.getSubchannels().iterator().next();
AtomicReference<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO);
@ -329,7 +325,7 @@ public class RoundRobinLoadBalancerTest {
@Test
public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), affinity);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
@ -353,7 +349,7 @@ public class RoundRobinLoadBalancerTest {
Subchannel sc2 = subchannelIterator.next();
Subchannel sc3 = subchannelIterator.next();
loadBalancer.handleResolvedAddresses(Lists.newArrayList(servers.keySet()), Attributes.EMPTY);
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY);
verify(sc1, times(1)).requestConnection();
verify(sc2, times(1)).requestConnection();
verify(sc3, times(1)).requestConnection();

View File

@ -48,7 +48,6 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
import io.grpc.internal.LogId;
@ -167,26 +166,23 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
}
@Override
public void handleResolvedAddresses(List<ResolvedServerInfoGroup> updatedServers,
Attributes attributes) {
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> updatedServers, Attributes attributes) {
LbPolicy newLbPolicy = attributes.get(GrpclbConstants.ATTR_LB_POLICY);
// LB addresses and backend addresses are treated separately
List<LbAddressGroup> newLbAddressGroups = new ArrayList<LbAddressGroup>();
List<ResolvedServerInfoGroup> newBackendServerInfoGroups =
new ArrayList<ResolvedServerInfoGroup>();
for (ResolvedServerInfoGroup serverInfoGroup : updatedServers) {
String lbAddrAuthority = serverInfoGroup.getAttributes().get(
GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
EquivalentAddressGroup eag = serverInfoGroup.toEquivalentAddressGroup();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<EquivalentAddressGroup>();
for (EquivalentAddressGroup server : updatedServers) {
String lbAddrAuthority = server.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority != null) {
newLbAddressGroups.add(new LbAddressGroup(eag, lbAddrAuthority));
newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority));
} else {
newBackendServerInfoGroups.add(serverInfoGroup);
newBackendServers.add(server);
}
}
if (newBackendServerInfoGroups.isEmpty()) {
// handleResolvedAddresses()'s javadoc has guaranteed updatedServers is never empty.
if (newBackendServers.isEmpty()) {
// handleResolvedAddressGroups()'s javadoc has guaranteed updatedServers is never empty.
checkState(!newLbAddressGroups.isEmpty(),
"No backend address nor LB address. updatedServers=%s", updatedServers);
if (newLbPolicy != LbPolicy.GRPCLB) {
@ -226,7 +222,7 @@ class GrpclbLoadBalancer extends LoadBalancer implements WithLogId {
case PICK_FIRST:
case ROUND_ROBIN:
checkNotNull(delegate, "delegate should not be null. newLbPolicy=" + newLbPolicy);
delegate.handleResolvedAddresses(newBackendServerInfoGroups, attributes);
delegate.handleResolvedAddressGroups(newBackendServers, attributes);
break;
case GRPCLB:
if (newLbAddressGroups.isEmpty()) {

View File

@ -73,8 +73,6 @@ import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.grpclb.GrpclbConstants.LbPolicy;
@ -338,13 +336,13 @@ public class GrpclbLoadBalancerTest {
assertSame(error, errorPicker.result.getStatus());
// Recover with a subsequent success
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false);
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs));
verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
@ -358,8 +356,8 @@ public class GrpclbLoadBalancerTest {
assertSame(error, errorPicker.result.getStatus());
// Recover with a subsequent success
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(true);
EquivalentAddressGroup eag = resolvedServers.get(0).toEquivalentAddressGroup();
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
EquivalentAddressGroup eag = resolvedServers.get(0);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
@ -378,13 +376,13 @@ public class GrpclbLoadBalancerTest {
@Test
public void delegatingPickFirstThenNameResolutionFails() {
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false);
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false);
Attributes resolutionAttrs = Attributes.newBuilder().set(RESOLUTION_ATTR, "yeah").build();
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(pickFirstBalancerFactory).newLoadBalancer(helper);
verify(pickFirstBalancer).handleResolvedAddresses(eq(resolvedServers), eq(resolutionAttrs));
verify(pickFirstBalancer).handleResolvedAddressGroups(eq(resolvedServers), eq(resolutionAttrs));
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
@ -397,7 +395,7 @@ public class GrpclbLoadBalancerTest {
@Test
public void delegatingRoundRobinThenNameResolutionFails() {
List<ResolvedServerInfoGroup> resolvedServers = createResolvedServerInfoGroupList(false, false);
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(false, false);
Attributes resolutionAttrs = Attributes.newBuilder()
.set(RESOLUTION_ATTR, "yeah")
@ -406,7 +404,7 @@ public class GrpclbLoadBalancerTest {
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(roundRobinBalancerFactory).newLoadBalancer(helper);
verify(roundRobinBalancer).handleResolvedAddresses(resolvedServers, resolutionAttrs);
verify(roundRobinBalancer).handleResolvedAddressGroups(resolvedServers, resolutionAttrs);
// Then let name resolution fail. The error will be passed directly to the delegate.
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
@ -421,16 +419,14 @@ public class GrpclbLoadBalancerTest {
public void grpclbThenNameResolutionFails() {
InOrder inOrder = inOrder(helper);
// Go to GRPCLB first
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -464,23 +460,22 @@ public class GrpclbLoadBalancerTest {
@Test
public void switchPolicy() {
// Go to GRPCLB first
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, false, true);
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
// Switch to PICK_FIRST
List<ResolvedServerInfoGroup> pickFirstResolutionList =
createResolvedServerInfoGroupList(true, false, true);
List<EquivalentAddressGroup> pickFirstResolutionList =
createResolvedServerAddresses(true, false, true);
Attributes pickFirstResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
verify(pickFirstBalancerFactory, never()).newLoadBalancer(any(Helper.class));
@ -493,7 +488,7 @@ public class GrpclbLoadBalancerTest {
verify(pickFirstBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddresses(
verify(pickFirstBalancer).handleResolvedAddressGroups(
eq(Arrays.asList(pickFirstResolutionList.get(1))), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
@ -502,8 +497,8 @@ public class GrpclbLoadBalancerTest {
assertTrue(oobChannel.isShutdown());
// Switch to ROUND_ROBIN
List<ResolvedServerInfoGroup> roundRobinResolutionList =
createResolvedServerInfoGroupList(true, false, false);
List<EquivalentAddressGroup> roundRobinResolutionList =
createResolvedServerAddresses(true, false, false);
Attributes roundRobinResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.ROUND_ROBIN).build();
verify(roundRobinBalancerFactory, never()).newLoadBalancer(any(Helper.class));
@ -511,30 +506,28 @@ public class GrpclbLoadBalancerTest {
verify(roundRobinBalancerFactory).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(roundRobinBalancer).handleResolvedAddresses(
verify(roundRobinBalancer).handleResolvedAddressGroups(
eq(roundRobinResolutionList.subList(1, 3)), same(roundRobinResolutionAttrs));
assertSame(LbPolicy.ROUND_ROBIN, balancer.getLbPolicy());
assertSame(roundRobinBalancer, balancer.getDelegate());
// Special case: if all addresses are loadbalancers, use GRPCLB no matter what the NameResolver
// says.
grpclbResolutionList = createResolvedServerInfoGroupList(true, true, true);
grpclbResolutionList = createResolvedServerAddresses(true, true, true);
grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.PICK_FIRST).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper, times(2)).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
verify(helper, times(2)).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
verify(helper, times(2)).createOobChannel(any(EquivalentAddressGroup.class), any(String.class));
assertEquals(1, fakeOobChannels.size());
oobChannel = fakeOobChannels.poll();
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
// Special case: PICK_FIRST is the default
pickFirstResolutionList = createResolvedServerInfoGroupList(true, false, false);
pickFirstResolutionList = createResolvedServerAddresses(true, false, false);
pickFirstResolutionAttrs = Attributes.EMPTY;
verify(pickFirstBalancerFactory).newLoadBalancer(any(Helper.class));
assertFalse(oobChannel.isShutdown());
@ -542,7 +535,7 @@ public class GrpclbLoadBalancerTest {
verify(pickFirstBalancerFactory, times(2)).newLoadBalancer(same(helper));
// Only non-LB addresses are passed to the delegate
verify(pickFirstBalancer).handleResolvedAddresses(
verify(pickFirstBalancer).handleResolvedAddressGroups(
eq(pickFirstResolutionList.subList(1, 3)), same(pickFirstResolutionAttrs));
assertSame(LbPolicy.PICK_FIRST, balancer.getLbPolicy());
assertSame(pickFirstBalancer, balancer.getDelegate());
@ -553,16 +546,14 @@ public class GrpclbLoadBalancerTest {
@Test
public void grpclbWorking() {
InOrder inOrder = inOrder(helper);
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -700,25 +691,21 @@ public class GrpclbLoadBalancerTest {
InOrder inOrder = inOrder(helper, mockLbService);
// Make the first LB address fail to connect
failingLbAuthorities.add(lbAuthority(0));
List<ResolvedServerInfoGroup> grpclbResolutionList =
createResolvedServerInfoGroupList(true, true, true);
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, true, true);
Attributes grpclbResolutionAttrs = Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// First LB addr fails to connect
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
inOrder.verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
inOrder.verify(helper).updatePicker(isA(ErrorPicker.class));
assertEquals(2, fakeOobChannels.size());
assertTrue(fakeOobChannels.poll().isShutdown());
// Will move on to second LB addr
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()),
eq(lbAuthority(1)));
inOrder.verify(helper).createOobChannel(eq(grpclbResolutionList.get(1)), eq(lbAuthority(1)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
@ -736,9 +723,7 @@ public class GrpclbLoadBalancerTest {
assertEquals(error1.getCode(), errorPicker.result.getStatus().getCode());
assertTrue(errorPicker.result.getStatus().getDescription().contains(error1.getDescription()));
// Move on to the third LB.
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(2).toEquivalentAddressGroup()),
eq(lbAuthority(2)));
inOrder.verify(helper).createOobChannel(eq(grpclbResolutionList.get(2)), eq(lbAuthority(2)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
@ -753,17 +738,13 @@ public class GrpclbLoadBalancerTest {
assertTrue(fakeOobChannels.poll().isShutdown());
// Loop back to the first LB addr, which still fails.
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(0).toEquivalentAddressGroup()),
eq(lbAuthority(0)));
inOrder.verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
inOrder.verify(helper).updatePicker(isA(ErrorPicker.class));
assertEquals(2, fakeOobChannels.size());
assertTrue(fakeOobChannels.poll().isShutdown());
// Will move on to second LB addr
inOrder.verify(helper).createOobChannel(
eq(grpclbResolutionList.get(1).toEquivalentAddressGroup()),
eq(lbAuthority(1)));
inOrder.verify(helper).createOobChannel(eq(grpclbResolutionList.get(1)), eq(lbAuthority(1)));
inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
@ -805,27 +786,26 @@ public class GrpclbLoadBalancerTest {
}
private void deliverResolvedAddresses(
final List<ResolvedServerInfoGroup> addrs, final Attributes attrs) {
final List<EquivalentAddressGroup> addrs, final Attributes attrs) {
channelExecutor.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(addrs, attrs);
balancer.handleResolvedAddressGroups(addrs, attrs);
}
});
}
private static List<ResolvedServerInfoGroup> createResolvedServerInfoGroupList(boolean ... isLb) {
ArrayList<ResolvedServerInfoGroup> list = new ArrayList<ResolvedServerInfoGroup>();
private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) {
ArrayList<EquivalentAddressGroup> list = new ArrayList<EquivalentAddressGroup>();
for (int i = 0; i < isLb.length; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
ResolvedServerInfoGroup serverInfoGroup = ResolvedServerInfoGroup
.builder(isLb[i] ? Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, lbAuthority(i))
.build()
: Attributes.EMPTY)
.add(new ResolvedServerInfo(addr))
.build();
list.add(serverInfoGroup);
EquivalentAddressGroup eag =
new EquivalentAddressGroup(
addr,
isLb[i] ? Attributes.newBuilder()
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, lbAuthority(i))
.build() : Attributes.EMPTY);
list.add(eag);
}
return list;
}