all: add LoadBalancer overload for Resolution results

This commit is contained in:
Carl Mastrangelo 2019-03-29 09:31:24 -07:00 committed by GitHub
parent 026e4c53bd
commit 17d67f17fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 717 additions and 267 deletions

View File

@ -16,9 +16,12 @@
package io.grpc; package io.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -121,11 +124,184 @@ public abstract class LoadBalancer {
* *
* @param servers the resolved server addresses, never empty. * @param servers the resolved server addresses, never empty.
* @param attributes extra information from naming system. * @param attributes extra information from naming system.
* @deprecated override {@link #handleResolvedAddresses(ResolvedAddresses) instead}
* @since 1.2.0 * @since 1.2.0
*/ */
public abstract void handleResolvedAddressGroups( @Deprecated
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, List<EquivalentAddressGroup> servers,
@NameResolver.ResolutionResultAttr Attributes attributes); @NameResolver.ResolutionResultAttr Attributes attributes) {
throw new UnsupportedOperationException("Not implemented");
}
/**
* Handles newly resolved server groups and metadata attributes from name resolution system.
* {@code servers} contained in {@link EquivalentAddressGroup} should be considered equivalent
* but may be flattened into a single list if needed.
*
* <p>Implementations should not modify the given {@code servers}.
*
* @param resolvedAddresses the resolved server addresses, attributes, and config.
* @since 1.21.0
*/
@SuppressWarnings("deprecation")
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
handleResolvedAddressGroups(resolvedAddresses.getServers(), resolvedAddresses.getAttributes());
}
/**
* Represents a combination of the resolved server address, associated attributes and a load
* balancing policy config. The config is from the {@link
* LoadBalancerProvider#parseLoadBalancingPolicyConfig(Map)}.
*
* @since 1.21.0
*/
public static final class ResolvedAddresses {
private final List<EquivalentAddressGroup> servers;
@NameResolver.ResolutionResultAttr
private final Attributes attributes;
@Nullable
private final Object loadBalancingPolicyConfig;
// Make sure to update toBuilder() below!
private ResolvedAddresses(
List<EquivalentAddressGroup> servers,
@NameResolver.ResolutionResultAttr Attributes attributes,
Object loadBalancingPolicyConfig) {
this.servers =
Collections.unmodifiableList(new ArrayList<>(checkNotNull(servers, "servers")));
this.attributes = checkNotNull(attributes, "attributes");
this.loadBalancingPolicyConfig = loadBalancingPolicyConfig;
}
/**
* Factory for constructing a new Builder.
*
* @since 1.21.0
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Converts this back to a builder.
*
* @since 1.21.0
*/
public Builder toBuilder() {
return newBuilder()
.setServers(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(loadBalancingPolicyConfig);
}
/**
* Gets the server addresses.
*
* @since 1.21.0
*/
public List<EquivalentAddressGroup> getServers() {
return servers;
}
/**
* Gets the attributes associated with these addresses. If this was not previously set,
* {@link Attributes#EMPTY} will be returned.
*
* @since 1.21.0
*/
@NameResolver.ResolutionResultAttr
public Attributes getAttributes() {
return attributes;
}
/**
* Gets the domain specific load balancing policy. This is the config produced by
* {@link LoadBalancerProvider#parseLoadBalancingPolicyConfig(Map)}.
*
* @since 1.21.0
*/
@Nullable
public Object getLoadBalancingPolicyConfig() {
return loadBalancingPolicyConfig;
}
/**
* Builder for {@link ResolvedAddresses}.
*/
public static final class Builder {
private List<EquivalentAddressGroup> servers;
@NameResolver.ResolutionResultAttr
private Attributes attributes = Attributes.EMPTY;
@Nullable
private Object loadBalancingPolicyConfig;
Builder() {}
/**
* Sets the servers. This field is required.
*
* @return this.
*/
public Builder setServers(List<EquivalentAddressGroup> servers) {
this.servers = servers;
return this;
}
/**
* Sets the attributes. This field is optional; if not called, {@link Attributes#EMPTY}
* will be used.
*
* @return this.
*/
public Builder setAttributes(@NameResolver.ResolutionResultAttr Attributes attributes) {
this.attributes = attributes;
return this;
}
/**
* Sets the load balancing policy config. This field is optional.
*
* @return this.
*/
public Builder setLoadBalancingPolicyConfig(@Nullable Object loadBalancingPolicyConfig) {
this.loadBalancingPolicyConfig = loadBalancingPolicyConfig;
return this;
}
/**
* Constructs the {@link ResolvedAddresses}.
*/
public ResolvedAddresses build() {
return new ResolvedAddresses(servers, attributes, loadBalancingPolicyConfig);
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("servers", servers)
.add("attributes", attributes)
.add("loadBalancingPolicyConfig", loadBalancingPolicyConfig)
.toString();
}
@Override
public int hashCode() {
return Objects.hashCode(servers, attributes, loadBalancingPolicyConfig);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ResolvedAddresses)) {
return false;
}
ResolvedAddresses that = (ResolvedAddresses) obj;
return Objects.equal(this.servers, that.servers)
&& Objects.equal(this.attributes, that.attributes)
&& Objects.equal(this.loadBalancingPolicyConfig, that.loadBalancingPolicyConfig);
}
}
/** /**
* Handles an error from the name resolution system. * Handles an error from the name resolution system.
@ -170,10 +346,10 @@ public abstract class LoadBalancer {
/** /**
* Whether this LoadBalancer can handle empty address group list to be passed to {@link * Whether this LoadBalancer can handle empty address group list to be passed to {@link
* #handleResolvedAddressGroups}. The default implementation returns {@code false}, meaning that * #handleResolvedAddresses(ResolvedAddresses)}. The default implementation returns
* if the NameResolver returns an empty list, the Channel will turn that into an error and call * {@code false}, meaning that if the NameResolver returns an empty list, the Channel will turn
* {@link #handleNameResolutionError}. LoadBalancers that want to accept empty lists should * that into an error and call {@link #handleNameResolutionError}. LoadBalancers that want to
* override this method and return {@code true}. * accept empty lists should override this method and return {@code true}.
* *
* <p>This method should always return a constant value. It's not specified when this will be * <p>This method should always return a constant value. It's not specified when this will be
* called. * called.
@ -279,7 +455,7 @@ public abstract class LoadBalancer {
Status status, boolean drop) { Status status, boolean drop) {
this.subchannel = subchannel; this.subchannel = subchannel;
this.streamTracerFactory = streamTracerFactory; this.streamTracerFactory = streamTracerFactory;
this.status = Preconditions.checkNotNull(status, "status"); this.status = checkNotNull(status, "status");
this.drop = drop; this.drop = drop;
} }
@ -353,7 +529,7 @@ public abstract class LoadBalancer {
public static PickResult withSubchannel( public static PickResult withSubchannel(
Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory) { Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory) {
return new PickResult( return new PickResult(
Preconditions.checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK, checkNotNull(subchannel, "subchannel"), streamTracerFactory, Status.OK,
false); false);
} }
@ -486,7 +662,7 @@ public abstract class LoadBalancer {
* @since 1.2.0 * @since 1.2.0
*/ */
public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) { public final Subchannel createSubchannel(EquivalentAddressGroup addrs, Attributes attrs) {
Preconditions.checkNotNull(addrs, "addrs"); checkNotNull(addrs, "addrs");
return createSubchannel(Collections.singletonList(addrs), attrs); return createSubchannel(Collections.singletonList(addrs), attrs);
} }
@ -518,7 +694,7 @@ public abstract class LoadBalancer {
*/ */
public final void updateSubchannelAddresses( public final void updateSubchannelAddresses(
Subchannel subchannel, EquivalentAddressGroup addrs) { Subchannel subchannel, EquivalentAddressGroup addrs) {
Preconditions.checkNotNull(addrs, "addrs"); checkNotNull(addrs, "addrs");
updateSubchannelAddresses(subchannel, Collections.singletonList(addrs)); updateSubchannelAddresses(subchannel, Collections.singletonList(addrs));
} }

View File

@ -55,7 +55,9 @@ public abstract class LoadBalancerProvider extends LoadBalancer.Factory {
* Parses the config for the Load Balancing policy unpacked from the service config. This will * Parses the config for the Load Balancing policy unpacked from the service config. This will
* return a {@link ConfigOrError} which contains either the successfully parsed config, or the * return a {@link ConfigOrError} which contains either the successfully parsed config, or the
* {@link Status} representing the failure to parse. Implementations are expected to not throw * {@link Status} representing the failure to parse. Implementations are expected to not throw
* exceptions but return a Status representing the failure. * exceptions but return a Status representing the failure. If successful, the load balancing
* policy config should be immutable, and implement {@link Object#equals(Object)} and
* {@link Object#hashCode()}.
* *
* @param rawLoadBalancingPolicyConfig The {@link Map} representation of the load balancing * @param rawLoadBalancingPolicyConfig The {@link Map} representation of the load balancing
* policy choice. * policy choice.

View File

@ -68,8 +68,12 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
private static final class NoopLoadBalancer extends LoadBalancer { private static final class NoopLoadBalancer extends LoadBalancer {
@Override @Override
@Deprecated
public void handleResolvedAddressGroups(List<EquivalentAddressGroup> s, Attributes a) {} public void handleResolvedAddressGroups(List<EquivalentAddressGroup> s, Attributes a) {}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {}
@Override @Override
public void handleNameResolutionError(Status error) {} public void handleNameResolutionError(Status error) {}
@ -100,8 +104,9 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
// Must be run inside ChannelExecutor. // Must be run inside ChannelExecutor.
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers, Attributes attributes) { List<EquivalentAddressGroup> servers = resolvedAddresses.getServers();
Attributes attributes = resolvedAddresses.getAttributes();
if (attributes.get(ATTR_LOAD_BALANCING_CONFIG) != null) { if (attributes.get(ATTR_LOAD_BALANCING_CONFIG) != null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Unexpected ATTR_LOAD_BALANCING_CONFIG from upstream: " "Unexpected ATTR_LOAD_BALANCING_CONFIG from upstream: "
@ -147,7 +152,11 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
"Name resolver returned no usable address. addrs=" "Name resolver returned no usable address. addrs="
+ servers + ", attrs=" + attributes)); + servers + ", attrs=" + attributes));
} else { } else {
delegate.handleResolvedAddressGroups(selection.serverList, attributes); delegate.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(selection.serverList)
.setAttributes(attributes)
.build());
} }
} }

View File

@ -55,6 +55,7 @@ import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -1385,7 +1386,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, effectiveServiceConfig) .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, effectiveServiceConfig)
.build(); .build();
} }
helper.lb.handleResolvedAddressGroups(servers, effectiveAttrs); helper.lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(effectiveAttrs)
.build());
} }
} }
} }

View File

@ -48,8 +48,8 @@ final class PickFirstLoadBalancer extends LoadBalancer {
} }
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers, Attributes attributes) { List<EquivalentAddressGroup> servers = resolvedAddresses.getServers();
if (subchannel == null) { if (subchannel == null) {
subchannel = helper.createSubchannel(servers, Attributes.EMPTY); subchannel = helper.createSubchannel(servers, Attributes.EMPTY);

View File

@ -21,7 +21,6 @@ import io.grpc.Attributes;
import io.grpc.ConnectivityStateInfo; import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
@ -35,12 +34,18 @@ public abstract class ForwardingLoadBalancer extends LoadBalancer {
protected abstract LoadBalancer delegate(); protected abstract LoadBalancer delegate();
@Override @Override
@Deprecated
public void handleResolvedAddressGroups( public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, List<EquivalentAddressGroup> servers,
@NameResolver.ResolutionResultAttr Attributes attributes) { @NameResolver.ResolutionResultAttr Attributes attributes) {
delegate().handleResolvedAddressGroups(servers, attributes); delegate().handleResolvedAddressGroups(servers, attributes);
} }
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
delegate().handleResolvedAddresses(resolvedAddresses);
}
@Override @Override
public void handleNameResolutionError(Status error) { public void handleNameResolutionError(Status error) {
delegate().handleNameResolutionError(error); delegate().handleNameResolutionError(error);

View File

@ -87,8 +87,9 @@ final class RoundRobinLoadBalancer extends LoadBalancer {
} }
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers, Attributes attributes) { List<EquivalentAddressGroup> servers = resolvedAddresses.getServers();
Attributes attributes = resolvedAddresses.getAttributes();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();
Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers); Set<EquivalentAddressGroup> latestAddrs = stripAttrs(servers);
Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs); Set<EquivalentAddressGroup> addedAddrs = setsDifference(latestAddrs, currentAddrs);

View File

@ -42,6 +42,7 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
@ -184,7 +185,11 @@ public class AutoConfiguredLoadBalancerFactoryTest {
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper); (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper);
LoadBalancer oldDelegate = lb.getDelegate(); LoadBalancer oldDelegate = lb.getDelegate();
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(Attributes.EMPTY)
.build());
assertThat(lb.getDelegate()).isSameAs(oldDelegate); assertThat(lb.getDelegate()).isSameAs(oldDelegate);
} }
@ -228,7 +233,11 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}; };
lb.setDelegate(testlb); lb.setDelegate(testlb);
lb.handleResolvedAddressGroups(servers, serviceConfigAttrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(serviceConfigAttrs)
.build());
assertThat(lb.getDelegateProvider().getClass().getName()).isEqualTo( assertThat(lb.getDelegateProvider().getClass().getName()).isEqualTo(
"io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider"); "io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
@ -249,13 +258,20 @@ public class AutoConfiguredLoadBalancerFactoryTest {
AutoConfiguredLoadBalancer lb = AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper); (AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper);
lb.handleResolvedAddressGroups(servers, serviceConfigAttrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(serviceConfigAttrs)
.build());
verify(testLbBalancerProvider).newLoadBalancer(same(helper)); verify(testLbBalancerProvider).newLoadBalancer(same(helper));
assertThat(lb.getDelegate()).isSameAs(testLbBalancer); assertThat(lb.getDelegate()).isSameAs(testLbBalancer);
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(testLbBalancer).handleResolvedAddressGroups(eq(servers), attrsCaptor.capture()); ArgumentCaptor.forClass(ResolvedAddresses.class);
assertThat(attrsCaptor.getValue().get(ATTR_LOAD_BALANCING_CONFIG)) verify(testLbBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getServers()).containsExactlyElementsIn(servers).inOrder();
Attributes actualAttributes = resultCaptor.getValue().getAttributes();
assertThat(actualAttributes.get(ATTR_LOAD_BALANCING_CONFIG))
.isEqualTo(Collections.singletonMap("setting1", "high")); .isEqualTo(Collections.singletonMap("setting1", "high"));
verify(testLbBalancer, atLeast(0)).canHandleEmptyAddressListFromNameResolution(); verify(testLbBalancer, atLeast(0)).canHandleEmptyAddressListFromNameResolution();
verifyNoMoreInteractions(testLbBalancer); verifyNoMoreInteractions(testLbBalancer);
@ -266,12 +282,19 @@ public class AutoConfiguredLoadBalancerFactoryTest {
Attributes.newBuilder() Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig) .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig)
.build(); .build();
lb.handleResolvedAddressGroups(servers, serviceConfigAttrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(serviceConfigAttrs)
.build());
verify(testLbBalancer, times(2)) resultCaptor =
.handleResolvedAddressGroups(eq(servers), attrsCaptor.capture()); ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(testLbBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getServers()).containsExactlyElementsIn(servers).inOrder();
actualAttributes = resultCaptor.getValue().getAttributes();
// But the balancer config is changed. // But the balancer config is changed.
assertThat(attrsCaptor.getValue().get(ATTR_LOAD_BALANCING_CONFIG)) assertThat(actualAttributes.get(ATTR_LOAD_BALANCING_CONFIG))
.isEqualTo(Collections.singletonMap("setting1", "low")); .isEqualTo(Collections.singletonMap("setting1", "low"));
// Service config didn't change policy, thus the delegateLb is not swapped // Service config didn't change policy, thus the delegateLb is not swapped
verifyNoMoreInteractions(testLbBalancer); verifyNoMoreInteractions(testLbBalancer);
@ -296,11 +319,18 @@ public class AutoConfiguredLoadBalancerFactoryTest {
(AutoConfiguredLoadBalancer) new AutoConfiguredLoadBalancerFactory( (AutoConfiguredLoadBalancer) new AutoConfiguredLoadBalancerFactory(
registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(helper); registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(helper);
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(Attributes.EMPTY)
.build());
assertThat(lb.getDelegate()).isSameAs(testLbBalancer); assertThat(lb.getDelegate()).isSameAs(testLbBalancer);
verify(testLbBalancer).handleResolvedAddressGroups( verify(testLbBalancer).handleResolvedAddresses(
eq(Collections.singletonList(servers.get(0))), any(Attributes.class)); ResolvedAddresses.newBuilder()
.setServers(Collections.singletonList(servers.get(0)))
.setAttributes(Attributes.EMPTY)
.build());
} }
@Test @Test
@ -312,10 +342,12 @@ public class AutoConfiguredLoadBalancerFactoryTest {
Map<String, ?> serviceConfig = Map<String, ?> serviceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"test_lb\": { \"setting1\": \"high\" } } ] }"); parseConfig("{\"loadBalancingConfig\": [ {\"test_lb\": { \"setting1\": \"high\" } } ] }");
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), ResolvedAddresses.newBuilder()
Attributes.newBuilder() .setServers(Collections.<EquivalentAddressGroup>emptyList())
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); .setAttributes(Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build())
.build());
assertThat(lb.getDelegate()).isSameAs(testLbBalancer); assertThat(lb.getDelegate()).isSameAs(testLbBalancer);
assertThat(testLbBalancer.canHandleEmptyAddressListFromNameResolution()).isFalse(); assertThat(testLbBalancer.canHandleEmptyAddressListFromNameResolution()).isFalse();
@ -335,20 +367,24 @@ public class AutoConfiguredLoadBalancerFactoryTest {
Map<String, ?> serviceConfig = Map<String, ?> serviceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"test_lb2\": { \"setting1\": \"high\" } } ] }"); parseConfig("{\"loadBalancingConfig\": [ {\"test_lb2\": { \"setting1\": \"high\" } } ] }");
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), ResolvedAddresses.newBuilder()
Attributes.newBuilder() .setServers(Collections.<EquivalentAddressGroup>emptyList())
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); .setAttributes(Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build())
.build());
assertThat(lb.getDelegate()).isSameAs(testLbBalancer2); assertThat(lb.getDelegate()).isSameAs(testLbBalancer2);
assertThat(testLbBalancer2.canHandleEmptyAddressListFromNameResolution()).isTrue(); assertThat(testLbBalancer2.canHandleEmptyAddressListFromNameResolution()).isTrue();
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(testLbBalancer2).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(Collections.<EquivalentAddressGroup>emptyList()), attrsCaptor.capture()); verify(testLbBalancer2).handleResolvedAddresses(resultCaptor.capture());
Map<String, ?> lbConfig = assertThat(resultCaptor.getValue().getServers()).isEmpty();
attrsCaptor.getValue().get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG); Attributes actualAttributes = resultCaptor.getValue().getAttributes();
Map<String, ?> lbConfig = actualAttributes.get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG);
assertThat(lbConfig).isEqualTo(Collections.<String, Object>singletonMap("setting1", "high")); assertThat(lbConfig).isEqualTo(Collections.<String, Object>singletonMap("setting1", "high"));
assertThat(attrsCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) assertThat(actualAttributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isSameAs(serviceConfig); .isSameAs(serviceConfig);
} }
@ -701,15 +737,22 @@ public class AutoConfiguredLoadBalancerFactoryTest {
LoadBalancer lb = new AutoConfiguredLoadBalancerFactory(GrpcUtil.DEFAULT_LB_POLICY) LoadBalancer lb = new AutoConfiguredLoadBalancerFactory(GrpcUtil.DEFAULT_LB_POLICY)
.newLoadBalancer(helper); .newLoadBalancer(helper);
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(Attributes.EMPTY)
.build());
verifyNoMoreInteractions(channelLogger); verifyNoMoreInteractions(channelLogger);
Map<String, String> serviceConfig = new HashMap<>(); Map<String, String> serviceConfig = new HashMap<>();
serviceConfig.put("loadBalancingPolicy", "round_robin"); serviceConfig.put("loadBalancingPolicy", "round_robin");
lb.handleResolvedAddressGroups(servers, lb.handleResolvedAddresses(
Attributes.newBuilder() ResolvedAddresses.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); .setServers(servers)
.setAttributes(Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build())
.build());
verify(channelLogger).log( verify(channelLogger).log(
eq(ChannelLogLevel.INFO), eq(ChannelLogLevel.INFO),
@ -722,9 +765,12 @@ public class AutoConfiguredLoadBalancerFactoryTest {
verifyNoMoreInteractions(channelLogger); verifyNoMoreInteractions(channelLogger);
serviceConfig.put("loadBalancingPolicy", "round_robin"); serviceConfig.put("loadBalancingPolicy", "round_robin");
lb.handleResolvedAddressGroups(servers, lb.handleResolvedAddresses(
Attributes.newBuilder() ResolvedAddresses.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build()); .setServers(servers)
.setAttributes(Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build())
.build());
verify(channelLogger, times(2)).log( verify(channelLogger, times(2)).log(
eq(ChannelLogLevel.DEBUG), eq(ChannelLogLevel.DEBUG),
eq("Load-balancing config: {0}"), eq("Load-balancing config: {0}"),
@ -734,7 +780,11 @@ public class AutoConfiguredLoadBalancerFactoryTest {
servers = Collections.singletonList(new EquivalentAddressGroup( servers = Collections.singletonList(new EquivalentAddressGroup(
new SocketAddress(){}, new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build())); Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(servers)
.setAttributes(Attributes.EMPTY)
.build());
verify(channelLogger).log( verify(channelLogger).log(
eq(ChannelLogLevel.INFO), eq(ChannelLogLevel.INFO),
@ -756,11 +806,17 @@ public class AutoConfiguredLoadBalancerFactoryTest {
} }
@Override @Override
@Deprecated
public void handleResolvedAddressGroups( public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) { List<EquivalentAddressGroup> servers, Attributes attributes) {
delegate().handleResolvedAddressGroups(servers, attributes); delegate().handleResolvedAddressGroups(servers, attributes);
} }
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
delegate().handleResolvedAddresses(resolvedAddresses);
}
@Override @Override
public void handleNameResolutionError(Status error) { public void handleNameResolutionError(Status error) {
delegate().handleNameResolutionError(error); delegate().handleNameResolutionError(error);

View File

@ -44,6 +44,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
@ -228,7 +229,8 @@ public class ManagedChannelImplIdlenessTest {
.setAttributes(Attributes.EMPTY) .setAttributes(Attributes.EMPTY)
.build(); .build();
nameResolverObserverCaptor.getValue().onResult(resolutionResult); nameResolverObserverCaptor.getValue().onResult(resolutionResult);
verify(mockLoadBalancer).handleResolvedAddressGroups(servers, Attributes.EMPTY); verify(mockLoadBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(Attributes.EMPTY).build());
} }
@Test @Test

View File

@ -81,6 +81,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
@ -140,7 +141,6 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor; import org.mockito.Captor;
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mock; import org.mockito.Mock;
@ -651,8 +651,11 @@ public class ManagedChannelImplTest {
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0);
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockLoadBalancer).handleResolvedAddressGroups( verify(mockLoadBalancer).handleResolvedAddresses(
eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); ResolvedAddresses.newBuilder()
.setServers(Arrays.asList(addressGroup))
.setAttributes(Attributes.EMPTY)
.build());
Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel1 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel2 = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
@ -890,14 +893,15 @@ public class ManagedChannelImplTest {
// LoadBalancer received the empty list and the LB config // LoadBalancer received the empty list and the LB config
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
ArgumentCaptor<Attributes> attrsCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(ImmutableList.<EquivalentAddressGroup>of()), attrsCaptor.capture()); verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
Map<String, ?> lbConfig = assertThat(resultCaptor.getValue().getServers()).isEmpty();
attrsCaptor.getValue().get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG); Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertEquals(ImmutableMap.<String, String>of("setting1", "high"), lbConfig); Map<String, ?> lbConfig = actualAttrs.get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG);
assertEquals(ImmutableMap.of("setting1", "high"), lbConfig);
assertSame( assertSame(
serviceConfig, attrsCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)); serviceConfig, actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG));
} }
@Test @Test
@ -960,8 +964,12 @@ public class ManagedChannelImplTest {
// LoadBalancer received the empty list and the LB config // LoadBalancer received the empty list and the LB config
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
verify(mockLoadBalancer).handleResolvedAddressGroups( verify(mockLoadBalancer).handleResolvedAddresses(
eq(ImmutableList.<EquivalentAddressGroup>of()), same(serviceConfigAttrs)); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(serviceConfigAttrs)
.build());
} }
@Test @Test
@ -977,8 +985,7 @@ public class ManagedChannelImplTest {
createChannel(); createChannel();
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class)); verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
doThrow(ex).when(mockLoadBalancer).handleResolvedAddressGroups( doThrow(ex).when(mockLoadBalancer).handleResolvedAddresses(any(ResolvedAddresses.class));
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), any(Attributes.class));
// NameResolver returns addresses. // NameResolver returns addresses.
nameResolverFactory.allResolved(); nameResolverFactory.allResolved();
@ -1040,8 +1047,11 @@ public class ManagedChannelImplTest {
// Simulate name resolution results // Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); ResolvedAddresses.newBuilder()
.setServers(Arrays.asList(addressGroup))
.setAttributes(Attributes.EMPTY)
.build());
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel)); .thenReturn(PickResult.withSubchannel(subchannel));
@ -1188,8 +1198,11 @@ public class ManagedChannelImplTest {
// Simulate name resolution results // Simulate name resolution results
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs); EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(resolvedAddrs);
inOrder.verify(mockLoadBalancer).handleResolvedAddressGroups( inOrder.verify(mockLoadBalancer).handleResolvedAddresses(
eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); ResolvedAddresses.newBuilder()
.setServers(Arrays.asList(addressGroup))
.setAttributes(Attributes.EMPTY)
.build());
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel)); .thenReturn(PickResult.withSubchannel(subchannel));
@ -3044,9 +3057,11 @@ public class ManagedChannelImplTest {
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue(); helper = helperCaptor.getValue();
verify(mockLoadBalancer) verify(mockLoadBalancer).handleResolvedAddresses(
.handleResolvedAddressGroups( ResolvedAddresses.newBuilder()
eq(nameResolverFactory.servers), same(attributesWithRetryPolicy)); .setServers(nameResolverFactory.servers)
.setAttributes(attributesWithRetryPolicy)
.build());
// simulating request connection and then transport ready after resolved address // simulating request connection and then transport ready after resolved address
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY); Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
@ -3141,8 +3156,11 @@ public class ManagedChannelImplTest {
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class); ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture()); verify(mockLoadBalancerProvider).newLoadBalancer(helperCaptor.capture());
helper = helperCaptor.getValue(); helper = helperCaptor.getValue();
verify(mockLoadBalancer) verify(mockLoadBalancer).handleResolvedAddresses(
.handleResolvedAddressGroups(nameResolverFactory.servers, attributesWithRetryPolicy); ResolvedAddresses.newBuilder()
.setServers(nameResolverFactory.servers)
.setAttributes(attributesWithRetryPolicy)
.build());
// simulating request connection and then transport ready after resolved address // simulating request connection and then transport ready after resolved address
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
@ -3510,12 +3528,13 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class);
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor<ResolvedAddresses> resultCaptor =
eq(ImmutableList.of(addressGroup)), ArgumentCaptor.forClass(ResolvedAddresses.class);
attributesCaptor.capture()); verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
.isNull(); Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)).isNull();
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);
@ -3545,11 +3564,14 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class);
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor<ResolvedAddresses> resultCaptor =
eq(ImmutableList.of(addressGroup)), ArgumentCaptor.forClass(ResolvedAddresses.class);
attributesCaptor.capture()); verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isEqualTo(defaultServiceConfig); .isEqualTo(defaultServiceConfig);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
@ -3577,11 +3599,13 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(ImmutableList.of(addressGroup)), verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
attributesCaptor.capture()); assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isEqualTo(serviceConfig); .isEqualTo(serviceConfig);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
@ -3597,11 +3621,11 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
nameResolverFactory.allResolved(); nameResolverFactory.allResolved();
attributesCaptor = ArgumentCaptor.forClass(Attributes.class); resultCaptor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer, times(2)).handleResolvedAddressGroups( verify(mockLoadBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture());
eq(ImmutableList.of(addressGroup)), assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
attributesCaptor.capture()); actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isEqualTo(serviceConfig); .isEqualTo(serviceConfig);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
@ -3634,11 +3658,12 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(ImmutableList.of(addressGroup)), verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
attributesCaptor.capture()); assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isEqualTo(serviceConfig); .isEqualTo(serviceConfig);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
@ -3665,11 +3690,12 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(ImmutableList.of(addressGroup)), verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
attributesCaptor.capture()); assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) Attributes actualAttrs = resultCaptor.getValue().getAttributes();
assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG))
.isEqualTo(defaultServiceConfig); .isEqualTo(defaultServiceConfig);
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
@ -3690,12 +3716,12 @@ public class ManagedChannelImplTest {
nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs); nameResolverFactory.nextResolvedAttributes.set(serviceConfigAttrs);
createChannel(); createChannel();
ArgumentCaptor<Attributes> attributesCaptor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> resultCaptor =
verify(mockLoadBalancer).handleResolvedAddressGroups( ArgumentCaptor.forClass(ResolvedAddresses.class);
eq(ImmutableList.of(addressGroup)), verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
attributesCaptor.capture()); assertThat(resultCaptor.getValue().getServers()).containsExactly(addressGroup);
assertThat(attributesCaptor.getValue().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)) Attributes actualAttrs = resultCaptor.getValue().getAttributes();
.isNull(); assertThat(actualAttrs.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG)).isNull();
verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class)); verify(mockLoadBalancer, never()).handleNameResolutionError(any(Status.class));
} finally { } finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider); LoadBalancerRegistry.getDefaultRegistry().deregister(mockLoadBalancerProvider);

View File

@ -40,6 +40,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status; import io.grpc.Status;
@ -105,7 +106,8 @@ public class PickFirstLoadBalancerTest {
@Test @Test
public void pickAfterResolved() throws Exception { public void pickAfterResolved() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
verify(mockHelper).createSubchannel(eq(servers), attrsCaptor.capture()); verify(mockHelper).createSubchannel(eq(servers), attrsCaptor.capture());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
@ -119,9 +121,11 @@ public class PickFirstLoadBalancerTest {
@Test @Test
public void pickAfterResolvedAndUnchanged() throws Exception { public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
verify(mockSubchannel).requestConnection(); verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockSubchannel);
verify(mockHelper).createSubchannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(), verify(mockHelper).createSubchannel(ArgumentMatchers.<EquivalentAddressGroup>anyList(),
@ -143,13 +147,15 @@ public class PickFirstLoadBalancerTest {
InOrder inOrder = inOrder(mockHelper); InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class)); inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection(); verify(mockSubchannel).requestConnection();
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
loadBalancer.handleResolvedAddressGroups(newServers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(newServers).setAttributes(affinity).build());
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers)); inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers));
verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockSubchannel);
@ -165,7 +171,8 @@ public class PickFirstLoadBalancerTest {
@Test @Test
public void pickAfterStateChangeAfterResolution() throws Exception { public void pickAfterStateChangeAfterResolution() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
reset(mockHelper); reset(mockHelper);
@ -210,7 +217,8 @@ public class PickFirstLoadBalancerTest {
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verify(mockSubchannel, never()).requestConnection(); verify(mockSubchannel, never()).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY)); inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection(); verify(mockSubchannel).requestConnection();
@ -252,7 +260,8 @@ public class PickFirstLoadBalancerTest {
@Test @Test
public void requestConnection() { public void requestConnection() {
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(IDLE)); loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(IDLE));
verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue(); SubchannelPicker picker = pickerCaptor.getValue();

View File

@ -54,6 +54,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -144,7 +145,8 @@ public class RoundRobinLoadBalancerTest {
@Test @Test
public void pickAfterResolved() throws Exception { public void pickAfterResolved() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next(); final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
verify(mockHelper, times(3)).createSubchannel(eagListCaptor.capture(), verify(mockHelper, times(3)).createSubchannel(eagListCaptor.capture(),
@ -208,7 +210,8 @@ public class RoundRobinLoadBalancerTest {
} }
}).when(mockHelper).createSubchannel(any(List.class), any(Attributes.class)); }).when(mockHelper).createSubchannel(any(List.class), any(Attributes.class));
loadBalancer.handleResolvedAddressGroups(currentServers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(currentServers).setAttributes(affinity).build());
InOrder inOrder = inOrder(mockHelper); InOrder inOrder = inOrder(mockHelper);
@ -230,8 +233,9 @@ public class RoundRobinLoadBalancerTest {
Lists.newArrayList( Lists.newArrayList(
new EquivalentAddressGroup(oldAddr), new EquivalentAddressGroup(oldAddr),
new EquivalentAddressGroup(newAddr)); new EquivalentAddressGroup(newAddr));
;
loadBalancer.handleResolvedAddressGroups(latestServers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(latestServers).setAttributes(affinity).build());
verify(newSubchannel, times(1)).requestConnection(); verify(newSubchannel, times(1)).requestConnection();
verify(removedSubchannel, times(1)).shutdown(); verify(removedSubchannel, times(1)).shutdown();
@ -249,8 +253,11 @@ public class RoundRobinLoadBalancerTest {
assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel); assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel);
// test going from non-empty to empty // test going from non-empty to empty
loadBalancer.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), loadBalancer.handleResolvedAddresses(
affinity); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(affinity)
.build());
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs));
@ -262,7 +269,8 @@ public class RoundRobinLoadBalancerTest {
@Test @Test
public void pickAfterStateChange() throws Exception { public void pickAfterStateChange() throws Exception {
InOrder inOrder = inOrder(mockHelper); InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(Attributes.EMPTY).build());
Subchannel subchannel = loadBalancer.getSubchannels().iterator().next(); Subchannel subchannel = loadBalancer.getSubchannels().iterator().next();
Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get( Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO); STATE_INFO);
@ -341,7 +349,8 @@ public class RoundRobinLoadBalancerTest {
@Test @Test
public void nameResolutionErrorWithActiveChannels() throws Exception { public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next(); final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddressGroups(servers, affinity); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(affinity).build());
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
@ -370,7 +379,8 @@ public class RoundRobinLoadBalancerTest {
Subchannel sc2 = subchannelIterator.next(); Subchannel sc2 = subchannelIterator.next();
Subchannel sc3 = subchannelIterator.next(); Subchannel sc3 = subchannelIterator.next();
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(Attributes.EMPTY).build());
verify(sc1, times(1)).requestConnection(); verify(sc1, times(1)).requestConnection();
verify(sc2, times(1)).requestConnection(); verify(sc2, times(1)).requestConnection();
verify(sc3, times(1)).requestConnection(); verify(sc3, times(1)).requestConnection();
@ -409,7 +419,8 @@ public class RoundRobinLoadBalancerTest {
@Test @Test
public void noStickinessEnabled_withStickyHeader() { public void noStickinessEnabled_withStickyHeader() {
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(Attributes.EMPTY).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -442,7 +453,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -474,7 +486,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -504,7 +517,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -549,7 +563,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -597,7 +612,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -651,7 +667,8 @@ public class RoundRobinLoadBalancerTest {
serviceConfig.put("stickinessMetadataKey", "my-sticky-key"); serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder() Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes).build());
for (Subchannel subchannel : subchannels.values()) { for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
} }
@ -687,7 +704,8 @@ public class RoundRobinLoadBalancerTest {
List<EquivalentAddressGroup> newServers = new ArrayList<>(servers); List<EquivalentAddressGroup> newServers = new ArrayList<>(servers);
newServers.remove(sc2.getAddresses()); newServers.remove(sc2.getAddresses());
loadBalancer.handleResolvedAddressGroups(newServers, attributes); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(newServers).setAttributes(attributes).build());
verify(sc2, times(1)).shutdown(); verify(sc2, times(1)).shutdown();
@ -707,14 +725,16 @@ public class RoundRobinLoadBalancerTest {
serviceConfig1.put("stickinessMetadataKey", "my-sticky-key1"); serviceConfig1.put("stickinessMetadataKey", "my-sticky-key1");
Attributes attributes1 = Attributes.newBuilder() Attributes attributes1 = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig1).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig1).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes1); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes1).build());
Map<String, ?> stickinessMap1 = loadBalancer.getStickinessMapForTest(); Map<String, ?> stickinessMap1 = loadBalancer.getStickinessMapForTest();
Map<String, String> serviceConfig2 = new HashMap<>(); Map<String, String> serviceConfig2 = new HashMap<>();
serviceConfig2.put("stickinessMetadataKey", "my-sticky-key2"); serviceConfig2.put("stickinessMetadataKey", "my-sticky-key2");
Attributes attributes2 = Attributes.newBuilder() Attributes attributes2 = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig2).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig2).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes2); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes2).build());
Map<String, ?> stickinessMap2 = loadBalancer.getStickinessMapForTest(); Map<String, ?> stickinessMap2 = loadBalancer.getStickinessMapForTest();
assertNotSame(stickinessMap1, stickinessMap2); assertNotSame(stickinessMap1, stickinessMap2);
@ -726,10 +746,12 @@ public class RoundRobinLoadBalancerTest {
serviceConfig1.put("stickinessMetadataKey", "my-sticky-key1"); serviceConfig1.put("stickinessMetadataKey", "my-sticky-key1");
Attributes attributes1 = Attributes.newBuilder() Attributes attributes1 = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig1).build(); .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig1).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes1); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes1).build());
Map<String, ?> stickinessMap1 = loadBalancer.getStickinessMapForTest(); Map<String, ?> stickinessMap1 = loadBalancer.getStickinessMapForTest();
loadBalancer.handleResolvedAddressGroups(servers, attributes1); loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(servers).setAttributes(attributes1).build());
Map<String, ?> stickinessMap2 = loadBalancer.getStickinessMapForTest(); Map<String, ?> stickinessMap2 = loadBalancer.getStickinessMapForTest();
assertSame(stickinessMap1, stickinessMap2); assertSame(stickinessMap1, stickinessMap2);

View File

@ -88,8 +88,9 @@ class GrpclbLoadBalancer extends LoadBalancer {
} }
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> updatedServers, Attributes attributes) { List<EquivalentAddressGroup> updatedServers = resolvedAddresses.getServers();
Attributes attributes = resolvedAddresses.getAttributes();
// LB addresses and backend addresses are treated separately // LB addresses and backend addresses are treated separately
List<LbAddressGroup> newLbAddressGroups = new ArrayList<>(); List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<>(); List<EquivalentAddressGroup> newBackendServers = new ArrayList<>();

View File

@ -60,6 +60,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
@ -2099,7 +2100,8 @@ public class GrpclbLoadBalancerTest {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
balancer.handleResolvedAddressGroups(addrs, attrs); balancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setServers(addrs).setAttributes(attrs).build());
} }
}); });
} }

View File

@ -169,13 +169,12 @@ final class HealthCheckingLoadBalancerFactory extends Factory {
} }
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, ?> serviceConfig = Map<String, ?> serviceConfig =
attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); resolvedAddresses.getAttributes().get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig); String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig);
helper.setHealthCheckedService(serviceName); helper.setHealthCheckedService(serviceName);
super.handleResolvedAddressGroups(servers, attributes); super.handleResolvedAddresses(resolvedAddresses);
} }
@Override @Override

View File

@ -49,6 +49,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Factory; import io.grpc.LoadBalancer.Factory;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
@ -187,13 +188,12 @@ public class HealthCheckingLoadBalancerFactoryTest {
boolean shutdown; boolean shutdown;
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(final ResolvedAddresses resolvedAddresses) {
final List<EquivalentAddressGroup> servers, final Attributes attributes) {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
if (!shutdown) { if (!shutdown) {
hcLb.handleResolvedAddressGroups(servers, attributes); hcLb.handleResolvedAddresses(resolvedAddresses);
} }
} }
}); });
@ -265,9 +265,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void typicalWorkflow() { public void typicalWorkflow() {
Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verify(origHelper, atLeast(0)).getSynchronizationContext(); verify(origHelper, atLeast(0)).getSynchronizationContext();
verify(origHelper, atLeast(0)).getScheduledExecutorService(); verify(origHelper, atLeast(0)).getScheduledExecutorService();
verifyNoMoreInteractions(origHelper); verifyNoMoreInteractions(origHelper);
@ -372,9 +376,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void healthCheckDisabledWhenServiceNotImplemented() { public void healthCheckDisabledWhenServiceNotImplemented() {
Attributes resolutionAttrs = attrsWithHealthCheckService("BarService"); Attributes resolutionAttrs = attrsWithHealthCheckService("BarService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
// We create 2 Subchannels. One of them connects to a server that doesn't implement health check // We create 2 Subchannels. One of them connects to a server that doesn't implement health check
@ -441,9 +449,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() { public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
FakeSubchannel subchannel = (FakeSubchannel) createSubchannel(0, Attributes.EMPTY); FakeSubchannel subchannel = (FakeSubchannel) createSubchannel(0, Attributes.EMPTY);
@ -509,9 +521,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serverRespondResetsBackoff() { public void serverRespondResetsBackoff() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -599,9 +615,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() { public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
// No service config, thus no health check. // No service config, thus no health check.
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(Attributes.EMPTY)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
// First, create Subchannels 0 // First, create Subchannels 0
@ -618,9 +638,12 @@ public class HealthCheckingLoadBalancerFactoryTest {
// Service config enables health check // Service config enables health check
Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
verify(origLb).handleResolvedAddressGroups( .setServers(resolvedAddressList)
same(resolvedAddressList), same(resolutionAttrs)); .setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
verify(origLb).handleResolvedAddresses(result2);
// Health check started on existing Subchannel // Health check started on existing Subchannel
assertThat(healthImpls[0].calls).hasSize(1); assertThat(healthImpls[0].calls).hasSize(1);
@ -639,9 +662,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigDisablesHealthCheckWhenRpcActive() { public void serviceConfigDisablesHealthCheckWhenRpcActive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -658,7 +685,11 @@ public class HealthCheckingLoadBalancerFactoryTest {
assertThat(serverCall.cancelled).isFalse(); assertThat(serverCall.cancelled).isFalse();
// NameResolver gives an update without service config, thus health check will be disabled // NameResolver gives an update without service config, thus health check will be disabled
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
// Health check RPC cancelled. // Health check RPC cancelled.
assertThat(serverCall.cancelled).isTrue(); assertThat(serverCall.cancelled).isTrue();
@ -666,8 +697,7 @@ public class HealthCheckingLoadBalancerFactoryTest {
inOrder.verify(origLb).handleSubchannelState( inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result2);
same(resolvedAddressList), same(Attributes.EMPTY));
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
assertThat(healthImpl.calls).isEmpty(); assertThat(healthImpl.calls).isEmpty();
@ -676,9 +706,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigDisablesHealthCheckWhenRetryPending() { public void serviceConfigDisablesHealthCheckWhenRetryPending() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -702,7 +736,11 @@ public class HealthCheckingLoadBalancerFactoryTest {
"Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
// NameResolver gives an update without service config, thus health check will be disabled // NameResolver gives an update without service config, thus health check will be disabled
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
// Retry timer is cancelled // Retry timer is cancelled
assertThat(clock.getPendingTasks()).isEmpty(); assertThat(clock.getPendingTasks()).isEmpty();
@ -714,8 +752,7 @@ public class HealthCheckingLoadBalancerFactoryTest {
inOrder.verify(origLb).handleSubchannelState( inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result2);
same(resolvedAddressList), same(Attributes.EMPTY));
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
} }
@ -723,9 +760,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigDisablesHealthCheckWhenRpcInactive() { public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -741,10 +782,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// NameResolver gives an update without service config, thus health check will be disabled // NameResolver gives an update without service config, thus health check will be disabled
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(Attributes.EMPTY)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result2);
same(resolvedAddressList), same(Attributes.EMPTY));
// Underlying subchannel is now ready // Underlying subchannel is now ready
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
@ -762,9 +806,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigChangesServiceNameWhenRpcActive() { public void serviceConfigChangesServiceNameWhenRpcActive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -787,18 +835,19 @@ public class HealthCheckingLoadBalancerFactoryTest {
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
// Service config returns with the same health check name. // Service config returns with the same health check name.
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); hcLbEventDelivery.handleResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens // It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result1);
same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
// Service config returns a different health check name. // Service config returns a different health check name.
resolutionAttrs = attrsWithHealthCheckService("FooService"); resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
inOrder.verify(origLb).handleResolvedAddressGroups( .setAttributes(resolutionAttrs)
same(resolvedAddressList), same(resolutionAttrs)); .build();
hcLbEventDelivery.handleResolvedAddresses(result2);
inOrder.verify(origLb).handleResolvedAddresses(result2);
// Current health check RPC cancelled. // Current health check RPC cancelled.
assertThat(serverCall.cancelled).isTrue(); assertThat(serverCall.cancelled).isTrue();
@ -816,9 +865,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigChangesServiceNameWhenRetryPending() { public void serviceConfigChangesServiceNameWhenRetryPending() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -846,24 +899,27 @@ public class HealthCheckingLoadBalancerFactoryTest {
"Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'"));
// Service config returns with the same health check name. // Service config returns with the same health check name.
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
hcLbEventDelivery.handleResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens // It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result1);
same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
assertThat(clock.getPendingTasks()).hasSize(1); assertThat(clock.getPendingTasks()).hasSize(1);
assertThat(healthImpl.calls).isEmpty(); assertThat(healthImpl.calls).isEmpty();
// Service config returns a different health check name. // Service config returns a different health check name.
resolutionAttrs = attrsWithHealthCheckService("FooService"); resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
// Concluded CONNECTING state // Concluded CONNECTING state
inOrder.verify(origLb).handleSubchannelState( inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result2);
same(resolvedAddressList), same(resolutionAttrs));
// Current retry timer cancelled // Current retry timer cancelled
assertThat(clock.getPendingTasks()).isEmpty(); assertThat(clock.getPendingTasks()).isEmpty();
@ -880,9 +936,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void serviceConfigChangesServiceNameWhenRpcInactive() { public void serviceConfigChangesServiceNameWhenRpcInactive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result1 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result1);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result1);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -899,19 +959,21 @@ public class HealthCheckingLoadBalancerFactoryTest {
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
// Service config returns with the same health check name. // Service config returns with the same health check name.
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); hcLbEventDelivery.handleResolvedAddresses(result1);
// It's delivered to origLb, but nothing else happens // It's delivered to origLb, but nothing else happens
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result1);
same(resolvedAddressList), same(resolutionAttrs));
assertThat(healthImpl.calls).isEmpty(); assertThat(healthImpl.calls).isEmpty();
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
// Service config returns a different health check name. // Service config returns a different health check name.
resolutionAttrs = attrsWithHealthCheckService("FooService"); resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result2 = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result2);
inOrder.verify(origLb).handleResolvedAddressGroups( inOrder.verify(origLb).handleResolvedAddresses(result2);
same(resolvedAddressList), same(resolutionAttrs));
// Underlying subchannel is now ready // Underlying subchannel is now ready
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
@ -959,9 +1021,13 @@ public class HealthCheckingLoadBalancerFactoryTest {
@Test @Test
public void balancerShutdown() { public void balancerShutdown() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
.setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); verify(origLb).handleResolvedAddresses(result);
verifyNoMoreInteractions(origLb); verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
@ -1011,8 +1077,12 @@ public class HealthCheckingLoadBalancerFactoryTest {
// Verify that HC works // Verify that HC works
Attributes resolutionAttrs = attrsWithHealthCheckService("BarService"); Attributes resolutionAttrs = attrsWithHealthCheckService("BarService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); ResolvedAddresses result = ResolvedAddresses.newBuilder()
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); .setServers(resolvedAddressList)
.setAttributes(resolutionAttrs)
.build();
hcLbEventDelivery.handleResolvedAddresses(result);
verify(origLb).handleResolvedAddresses(result);
createSubchannel(0, Attributes.EMPTY); createSubchannel(0, Attributes.EMPTY);
assertThat(healthImpls[0].calls).isEmpty(); assertThat(healthImpls[0].calls).isEmpty();
hcLbEventDelivery.handleSubchannelState( hcLbEventDelivery.handleSubchannelState(

View File

@ -84,8 +84,9 @@ final class XdsLoadBalancer extends LoadBalancer {
} }
@Override @Override
public void handleResolvedAddressGroups( public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers, Attributes attributes) { List<EquivalentAddressGroup> servers = resolvedAddresses.getServers();
Attributes attributes = resolvedAddresses.getAttributes();
Map<String, ?> newRawLbConfig = checkNotNull( Map<String, ?> newRawLbConfig = checkNotNull(
attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available"); attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available");
@ -252,7 +253,13 @@ final class XdsLoadBalancer extends LoadBalancer {
ChannelLogLevel.INFO, "Using fallback policy"); ChannelLogLevel.INFO, "Using fallback policy");
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName()) fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
.newLoadBalancer(helper); .newLoadBalancer(helper);
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes); // TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
// TODO: maybe update picker // TODO: maybe update picker
} }
@ -268,7 +275,12 @@ final class XdsLoadBalancer extends LoadBalancer {
this.fallbackPolicy = fallbackPolicy; this.fallbackPolicy = fallbackPolicy;
if (fallbackBalancer != null) { if (fallbackBalancer != null) {
if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) { if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) {
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes); // TODO(carl-mastrangelo): propagate the load balancing config policy
fallbackBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(fallbackServers)
.setAttributes(fallbackAttributes)
.build());
} else { } else {
fallbackBalancer.shutdown(); fallbackBalancer.shutdown();
fallbackBalancer = null; fallbackBalancer = null;

View File

@ -17,8 +17,6 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -28,6 +26,7 @@ import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry; import io.grpc.LoadBalancerRegistry;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
@ -122,16 +121,20 @@ public class FallbackManagerTest {
fallbackManager.updateFallbackServers( fallbackManager.updateFallbackServers(
eags, Attributes.EMPTY, fallbackPolicy); eags, Attributes.EMPTY, fallbackPolicy);
verify(fakeLb, never()).handleResolvedAddressGroups( verify(fakeLb, never()).handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), ArgumentMatchers.<Attributes>any());
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
verify(fakeLb).handleResolvedAddressGroups( verify(fakeLb).handleResolvedAddresses(
same(eags), ResolvedAddresses.newBuilder()
eq(Attributes.newBuilder() .setServers(eags)
.set(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue()) .setAttributes(
.build())); Attributes.newBuilder()
.set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG,
fallbackPolicy.getRawConfigValue())
.build())
.build());
} }
@Test @Test
@ -145,7 +148,6 @@ public class FallbackManagerTest {
fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); fakeClock.forwardTime(FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
verify(fakeLb, never()).handleResolvedAddressGroups( verify(fakeLb, never()).handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), ArgumentMatchers.<Attributes>any());
} }
} }

View File

@ -40,6 +40,7 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry; import io.grpc.LoadBalancerRegistry;
@ -260,7 +261,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
XdsLbState xdsLbState1 = lb.getXdsLbStateForTest(); XdsLbState xdsLbState1 = lb.getXdsLbStateForTest();
assertThat(xdsLbState1.childPolicy).isNull(); assertThat(xdsLbState1.childPolicy).isNull();
@ -278,7 +283,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build(); attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
XdsLbState xdsLbState2 = lb.getXdsLbStateForTest(); XdsLbState xdsLbState2 = lb.getXdsLbStateForTest();
assertThat(xdsLbState2.childPolicy).isNull(); assertThat(xdsLbState2.childPolicy).isNull();
@ -303,7 +312,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString()); verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString());
verify(oobChannel1) verify(oobChannel1)
.newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(), .newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(),
@ -318,7 +331,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build(); attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull(); assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull();
@ -341,7 +358,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString()); verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString());
verify(oobChannel1) verify(oobChannel1)
.newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(), .newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(),
@ -358,7 +379,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build(); attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(lb.getXdsLbStateForTest().childPolicy).isNull(); assertThat(lb.getXdsLbStateForTest().childPolicy).isNull();
@ -380,8 +405,11 @@ public class XdsLoadBalancerTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddresses(
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull(); assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull();
verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString()); verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString());
@ -398,7 +426,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build(); attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull(); assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull();
// verify oobChannel is unchanged // verify oobChannel is unchanged
@ -420,7 +452,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString()); verify(helper).createOobChannel(ArgumentMatchers.<EquivalentAddressGroup>any(), anyString());
verify(oobChannel1) verify(oobChannel1)
.newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(), .newCall(ArgumentMatchers.<MethodDescriptor<?, ?>>any(),
@ -435,7 +471,11 @@ public class XdsLoadBalancerTest {
Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig2 = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build(); attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig2).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull(); assertThat(lb.getXdsLbStateForTest().childPolicy).isNotNull();
@ -453,63 +493,71 @@ public class XdsLoadBalancerTest {
@Test @Test
public void fallback_AdsNotWorkingYetTimerExpired() throws Exception { public void fallback_AdsNotWorkingYetTimerExpired() throws Exception {
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), standardModeWithFallback1Attributes()); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
ArgumentCaptor<Attributes> captor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddressGroups( verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), captor.capture()); assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes"); .containsExactly("supported_1_option", "yes");
} }
@Test @Test
public void fallback_AdsWorkingTimerCancelled() throws Exception { public void fallback_AdsWorkingTimerCancelled() throws Exception {
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), standardModeWithFallback1Attributes()); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
verify(fakeBalancer1, never()).handleResolvedAddressGroups( verify(fakeBalancer1, never()).handleResolvedAddresses(
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), ArgumentMatchers.<Attributes>any()); ArgumentMatchers.any(ResolvedAddresses.class));
} }
@Test @Test
public void fallback_AdsErrorAndNoActiveSubchannel() throws Exception { public void fallback_AdsErrorAndNoActiveSubchannel() throws Exception {
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), standardModeWithFallback1Attributes()); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onError(new Exception("fake error")); serverResponseWriter.onError(new Exception("fake error"));
ArgumentCaptor<Attributes> captor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddressGroups( verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), captor.capture()); assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes"); .containsExactly("supported_1_option", "yes");
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1); assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
// verify handleResolvedAddressGroups() is not called again // verify handleResolvedAddresses() is not called again
verify(fakeBalancer1).handleResolvedAddressGroups( verify(fakeBalancer1).handleResolvedAddresses(ArgumentMatchers.any(ResolvedAddresses.class));
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), ArgumentMatchers.<Attributes>any());
} }
@Test @Test
public void fallback_AdsErrorWithActiveSubchannel() throws Exception { public void fallback_AdsErrorWithActiveSubchannel() throws Exception {
lb.handleResolvedAddressGroups( lb.handleResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), standardModeWithFallback1Attributes()); ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(standardModeWithFallback1Attributes())
.build());
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
doReturn(true).when(fakeSubchannelStore).hasReadyBackends(); doReturn(true).when(fakeSubchannelStore).hasReadyBackends();
serverResponseWriter.onError(new Exception("fake error")); serverResponseWriter.onError(new Exception("fake error"));
verify(fakeBalancer1, never()).handleResolvedAddressGroups( verify(fakeBalancer1, never()).handleResolvedAddresses(
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), ArgumentMatchers.<Attributes>any()); ArgumentMatchers.any(ResolvedAddresses.class));
Subchannel subchannel = new Subchannel() { Subchannel subchannel = new Subchannel() {
@Override @Override
@ -533,10 +581,9 @@ public class XdsLoadBalancerTest {
lb.handleSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure( lb.handleSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE)); Status.UNAVAILABLE));
ArgumentCaptor<Attributes> captor = ArgumentCaptor.forClass(Attributes.class); ArgumentCaptor<ResolvedAddresses> captor = ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(fakeBalancer1).handleResolvedAddressGroups( verify(fakeBalancer1).handleResolvedAddresses(captor.capture());
ArgumentMatchers.<List<EquivalentAddressGroup>>any(), captor.capture()); assertThat(captor.getValue().getAttributes().get(ATTR_LOAD_BALANCING_CONFIG))
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
.containsExactly("supported_1_option", "yes"); .containsExactly("supported_1_option", "yes");
} }
@ -560,7 +607,11 @@ public class XdsLoadBalancerTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build(); Attributes attrs = Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build();
lb.handleResolvedAddressGroups(Collections.<EquivalentAddressGroup>emptyList(), attrs); lb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.setAttributes(attrs)
.build());
assertThat(fakeClock.getPendingTasks()).isNotEmpty(); assertThat(fakeClock.getPendingTasks()).isNotEmpty();
lb.shutdown(); lb.shutdown();