core, grpclb: change policy selection strategy for Grpclb policy (take one: eliminate special logic for deciding grpclb policy in core) (#6637)

First take for grpclb selection stabilization: 

1. Changed DnsNameResolver to return balancer addresses as a GrpcAttributes.ATTR_LB_ADDRS attribute in ResolutionResult, instead of among the addresses.

2. AutoConfiguredLoadBalancerFactory decides LB policy solely based on parsed service config without looking at resolved addresses. Behavior changes:
  - If no LB policy is specified in service config, default to pick_first, even if there exist balancer addresses (in attributes).
  - If grpclb specified but not available and no other specified policies available, it will fail without fallback to round_robin.

3. GrpclbLoadBalancer populates balancer addresses from ResolvedAddresses's attribute (GrpclbConstants.ATTR_LB_ADDRS) instead of sieving from addresses.
This commit is contained in:
Chengyuan Zhang 2020-01-31 10:41:43 -08:00 committed by GitHub
parent 3e6a77a7ef
commit c0f37e59ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 492 additions and 619 deletions

View File

@ -223,6 +223,7 @@ public final class AltsProtocolNegotiator {
return SCHEME;
}
@SuppressWarnings("deprecation")
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);

View File

@ -41,18 +41,13 @@ import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.annotation.Nullable;
// TODO(creamsoup) fully deprecate LoadBalancer.ATTR_LOAD_BALANCING_CONFIG
@SuppressWarnings("deprecation")
public final class AutoConfiguredLoadBalancerFactory {
private static final Logger logger =
Logger.getLogger(AutoConfiguredLoadBalancerFactory.class.getName());
private static final String GRPCLB_POLICY_NAME = "grpclb";
private final LoadBalancerRegistry registry;
private final String defaultPolicy;
@ -92,7 +87,6 @@ public final class AutoConfiguredLoadBalancerFactory {
private final Helper helper;
private LoadBalancer delegate;
private LoadBalancerProvider delegateProvider;
private boolean roundRobinDueToGrpclbDepMissing;
AutoConfiguredLoadBalancer(Helper helper) {
this.helper = helper;
@ -125,10 +119,11 @@ public final class AutoConfiguredLoadBalancerFactory {
}
PolicySelection policySelection =
(PolicySelection) resolvedAddresses.getLoadBalancingPolicyConfig();
ResolvedPolicySelection resolvedSelection;
if (policySelection == null) {
LoadBalancerProvider defaultProvider;
try {
resolvedSelection = resolveLoadBalancerProvider(servers, policySelection);
defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy");
} catch (PolicyException e) {
Status s = Status.INTERNAL.withDescription(e.getMessage());
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s));
@ -137,36 +132,40 @@ public final class AutoConfiguredLoadBalancerFactory {
delegate = new NoopLoadBalancer();
return Status.OK;
}
PolicySelection selection = resolvedSelection.policySelection;
policySelection =
new PolicySelection(defaultProvider, /* rawConfig= */ null, /* config= */ null);
}
if (delegateProvider == null
|| !selection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) {
|| !policySelection.provider.getPolicyName().equals(delegateProvider.getPolicyName())) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
delegate.shutdown();
delegateProvider = selection.provider;
delegateProvider = policySelection.provider;
LoadBalancer old = delegate;
delegate = delegateProvider.newLoadBalancer(helper);
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Load balancer changed from {0} to {1}",
old.getClass().getSimpleName(), delegate.getClass().getSimpleName());
}
Object lbConfig = selection.config;
Object lbConfig = policySelection.config;
if (lbConfig != null) {
helper.getChannelLogger().log(
ChannelLogLevel.DEBUG, "Load-balancing config: {0}", selection.config);
ChannelLogLevel.DEBUG, "Load-balancing config: {0}", policySelection.config);
attributes =
attributes.toBuilder().set(ATTR_LOAD_BALANCING_CONFIG, selection.rawConfig).build();
attributes.toBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, policySelection.rawConfig)
.build();
}
LoadBalancer delegate = getDelegate();
if (resolvedSelection.serverList.isEmpty()
if (resolvedAddresses.getAddresses().isEmpty()
&& !delegate.canHandleEmptyAddressListFromNameResolution()) {
return Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address. addrs=" + servers + ", attrs=" + attributes);
} else {
delegate.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(resolvedSelection.serverList)
.setAddresses(resolvedAddresses.getAddresses())
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(lbConfig)
.build());
@ -206,78 +205,6 @@ public final class AutoConfiguredLoadBalancerFactory {
LoadBalancerProvider getDelegateProvider() {
return delegateProvider;
}
/**
* Resolves a load balancer based on given criteria. If policySelection is {@code null} and
* given servers contains any gRPC LB addresses, it will fall back to "grpclb". If no gRPC LB
* addresses are not present, it will fall back to {@link #defaultPolicy}.
*
* @param servers The list of servers reported
* @param policySelection the selected policy from raw service config
* @return the resolved policy selection
*/
@VisibleForTesting
ResolvedPolicySelection resolveLoadBalancerProvider(
List<EquivalentAddressGroup> servers, @Nullable PolicySelection policySelection)
throws PolicyException {
// Check for balancer addresses
boolean haveBalancerAddress = false;
List<EquivalentAddressGroup> backendAddrs = new ArrayList<>();
for (EquivalentAddressGroup s : servers) {
if (s.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY) != null) {
haveBalancerAddress = true;
} else {
backendAddrs.add(s);
}
}
if (policySelection != null) {
String policyName = policySelection.provider.getPolicyName();
return new ResolvedPolicySelection(
policySelection, policyName.equals(GRPCLB_POLICY_NAME) ? servers : backendAddrs);
}
if (haveBalancerAddress) {
// This is a special case where the existence of balancer address in the resolved address
// selects "grpclb" policy if the service config couldn't select a policy
LoadBalancerProvider grpclbProvider = registry.getProvider(GRPCLB_POLICY_NAME);
if (grpclbProvider == null) {
if (backendAddrs.isEmpty()) {
throw new PolicyException(
"Received ONLY balancer addresses but grpclb runtime is missing");
}
if (!roundRobinDueToGrpclbDepMissing) {
// We don't log the warning every time we have an update.
roundRobinDueToGrpclbDepMissing = true;
String errorMsg = "Found balancer addresses but grpclb runtime is missing."
+ " Will use round_robin. Please include grpc-grpclb in your runtime dependencies.";
helper.getChannelLogger().log(ChannelLogLevel.ERROR, errorMsg);
logger.warning(errorMsg);
}
return new ResolvedPolicySelection(
new PolicySelection(
getProviderOrThrow(
"round_robin", "received balancer addresses but grpclb runtime is missing"),
/* rawConfig = */ null,
/* config= */ null),
backendAddrs);
}
return new ResolvedPolicySelection(
new PolicySelection(
grpclbProvider, /* rawConfig= */ null, /* config= */ null), servers);
}
// No balancer address this time. If balancer address shows up later, we want to make sure
// the warning is logged one more time.
roundRobinDueToGrpclbDepMissing = false;
// No config nor balancer address. Use default.
return new ResolvedPolicySelection(
new PolicySelection(
getProviderOrThrow(defaultPolicy, "using default policy"),
/* rawConfig= */ null,
/* config= */ null),
servers);
}
}
private LoadBalancerProvider getProviderOrThrow(String policy, String choiceReason)
@ -406,26 +333,6 @@ public final class AutoConfiguredLoadBalancerFactory {
}
}
@VisibleForTesting
static final class ResolvedPolicySelection {
final PolicySelection policySelection;
final List<EquivalentAddressGroup> serverList;
ResolvedPolicySelection(
PolicySelection policySelection, List<EquivalentAddressGroup> serverList) {
this.policySelection = checkNotNull(policySelection, "policySelection");
this.serverList = Collections.unmodifiableList(checkNotNull(serverList, "serverList"));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("policySelection", policySelection)
.add("serverList", serverList)
.toString();
}
}
private static final class EmptyPicker extends SubchannelPicker {
@Override

View File

@ -298,14 +298,12 @@ final class DnsNameResolver extends NameResolver {
for (InetAddress inetAddr : resolutionResults.addresses) {
servers.add(new EquivalentAddressGroup(new InetSocketAddress(inetAddr, port)));
}
servers.addAll(resolutionResults.balancerAddresses);
if (servers.isEmpty()) {
savedListener.onError(Status.UNAVAILABLE.withDescription(
"No DNS backend or balancer addresses found for " + host));
return;
}
ResolutionResult.Builder resultBuilder = ResolutionResult.newBuilder().setAddresses(servers);
Attributes.Builder attributesBuilder = Attributes.newBuilder();
if (!resolutionResults.balancerAddresses.isEmpty()) {
attributesBuilder.set(GrpcAttributes.ATTR_LB_ADDRS, resolutionResults.balancerAddresses);
}
if (!resolutionResults.txtRecords.isEmpty()) {
ConfigOrError rawServiceConfig =
parseServiceConfig(resolutionResults.txtRecords, random, getLocalHostname());
@ -319,17 +317,14 @@ final class DnsNameResolver extends NameResolver {
Map<String, ?> verifiedRawServiceConfig = (Map<String, ?>) rawServiceConfig.getConfig();
ConfigOrError parsedServiceConfig =
serviceConfigParser.parseServiceConfig(verifiedRawServiceConfig);
resultBuilder
.setAttributes(
Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig)
.build())
.setServiceConfig(parsedServiceConfig);
resultBuilder.setServiceConfig(parsedServiceConfig);
attributesBuilder
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, verifiedRawServiceConfig);
}
} else {
logger.log(Level.FINE, "No TXT records found for {0}", new Object[]{host});
}
savedListener.onResult(resultBuilder.build());
savedListener.onResult(resultBuilder.setAttributes(attributesBuilder.build()).build());
}
}

View File

@ -21,6 +21,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.Grpc;
import io.grpc.NameResolver;
import io.grpc.SecurityLevel;
import java.util.List;
import java.util.Map;
/**
@ -37,10 +38,23 @@ public final class GrpcAttributes {
public static final Attributes.Key<Map<String, ?>> NAME_RESOLVER_SERVICE_CONFIG =
Attributes.Key.create("service-config");
/**
* Attribute key for gRPC LB server addresses.
*
* <p>Deprecated: this will be used for grpclb specific logic, which will be moved out of core.
*/
@Deprecated
@NameResolver.ResolutionResultAttr
public static final Attributes.Key<List<EquivalentAddressGroup>> ATTR_LB_ADDRS =
Attributes.Key.create("io.grpc.grpclb.lbAddrs");
/**
* The naming authority of a gRPC LB server address. It is an address-group-level attribute,
* present when the address group is a LoadBalancer.
*
* <p>Deprecated: this will be used for grpclb specific logic, which will be moved out of core.
*/
@Deprecated
@EquivalentAddressGroup.Attr
public static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
Attributes.Key.create("io.grpc.grpclb.lbAddrAuthority");

View File

@ -129,6 +129,7 @@ final class JndiResourceResolverFactory implements DnsNameResolver.ResourceResol
return Collections.unmodifiableList(serviceConfigTxtRecords);
}
@SuppressWarnings("deprecation")
@Override
public List<EquivalentAddressGroup> resolveSrv(
AddressResolver addressResolver, String grpclbHostname) throws Exception {

View File

@ -19,13 +19,10 @@ package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -50,23 +47,18 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.grpclb.GrpclbLoadBalancerProvider;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicyException;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.PolicySelection;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.ResolvedPolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -308,37 +300,42 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
@Test
public void handleResolvedAddressGroups_propagateOnlyBackendAddrsToDelegate() throws Exception {
// This case only happens when grpclb is missing. We will use a local registry
LoadBalancerRegistry registry = new LoadBalancerRegistry();
registry.register(new PickFirstLoadBalancerProvider());
registry.register(
new FakeLoadBalancerProvider(
"round_robin", testLbBalancer, /* nextParsedLbPolicyConfig= */ null));
public void handleResolvedAddressGroups_propagateAddrsToDelegate() throws Exception {
Map<String, ?> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"test_lb\": { \"setting1\": \"high\" } } ] }");
ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(lbConfigs.getConfig()).isNotNull();
final List<EquivalentAddressGroup> servers =
Arrays.asList(
new EquivalentAddressGroup(new SocketAddress(){}),
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
Helper helper = new TestHelper();
AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory(
registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(helper);
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper);
List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(8080){}));
Status handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(Attributes.EMPTY)
.setLoadBalancingPolicyConfig(lbConfigs.getConfig())
.build());
verify(testLbBalancerProvider).newLoadBalancer(same(helper));
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer);
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(testLbBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder();
servers =
Collections.singletonList(new EquivalentAddressGroup(new InetSocketAddress(9090){}));
handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setLoadBalancingPolicyConfig(lbConfigs.getConfig())
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer);
verify(testLbBalancer).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(Collections.singletonList(servers.get(0)))
.setAttributes(Attributes.EMPTY)
.build());
verify(testLbBalancer, times(2)).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).containsExactlyElementsIn(servers).inOrder();
}
@Test
@ -392,267 +389,79 @@ public class AutoConfiguredLoadBalancerFactoryTest {
}
@Test
public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_pickFirst()
throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
PolicySelection policySelection = null;
List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider)
.isInstanceOf(PickFirstLoadBalancerProvider.class);
assertThat(selection.serverList).isEqualTo(servers);
assertThat(selection.policySelection.config).isNull();
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_noBalancerAddresses_noServiceConfig_customDefault()
throws Exception {
AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb")
.newLoadBalancer(new TestHelper());
PolicySelection policySelection = null;
List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider).isSameInstanceAs(testLbBalancerProvider);
assertThat(selection.serverList).isEqualTo(servers);
assertThat(selection.policySelection.config).isNull();
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_oneBalancer_noServiceConfig_grpclb() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
PolicySelection policySelection = null;
List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class);
assertThat(selection.serverList).isEqualTo(servers);
assertThat(selection.policySelection.config).isNull();
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_serviceConfigLbPolicy() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
Map<String, ?> rawServiceConfig =
parseConfig("{\"loadBalancingPolicy\": \"round_robin\"}");
ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(lbConfig.getConfig()).isNotNull();
PolicySelection policySelection = (PolicySelection) lbConfig.getConfig();
List<EquivalentAddressGroup> servers =
Arrays.asList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()),
new EquivalentAddressGroup(
new SocketAddress(){}));
List<EquivalentAddressGroup> backends = Arrays.asList(servers.get(1));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo(
"io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
assertThat(selection.serverList).isEqualTo(backends);
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_serviceConfigLbConfig() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
public void handleResolvedAddressGroups_useSelectedLbPolicy() throws Exception {
Map<String, ?> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}");
ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(lbConfig.getConfig()).isNotNull();
PolicySelection policySelection = (PolicySelection) lbConfig.getConfig();
List<EquivalentAddressGroup> servers =
Arrays.asList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()),
new EquivalentAddressGroup(
new SocketAddress(){}));
List<EquivalentAddressGroup> backends = Arrays.asList(servers.get(1));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo(
"io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
assertThat(selection.serverList).isEqualTo(backends);
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_grpclbConfigPropagated() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
Map<String, ?> rawServiceConfig =
parseConfig(
"{\"loadBalancingConfig\": ["
+ "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }"
+ "] }");
ConfigOrError lbConfig = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(lbConfig.getConfig()).isNotNull();
PolicySelection policySelection = (PolicySelection) lbConfig.getConfig();
List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class);
assertThat(selection.serverList).isEqualTo(servers);
assertThat(selection.policySelection.config)
.isEqualTo(((PolicySelection) lbConfig.getConfig()).config);
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_policyUnavailButGrpclbAddressPresent() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null);
assertThat(selection.policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class);
assertThat(selection.serverList).isEqualTo(servers);
assertThat(selection.policySelection.config).isNull();
verifyZeroInteractions(channelLogger);
}
@Test
public void decideLoadBalancerProvider_grpclbProviderNotFound_fallbackToRoundRobin()
throws Exception {
LoadBalancerRegistry registry = new LoadBalancerRegistry();
registry.register(new PickFirstLoadBalancerProvider());
LoadBalancerProvider fakeRoundRobinProvider =
new FakeLoadBalancerProvider("round_robin", testLbBalancer, null);
registry.register(fakeRoundRobinProvider);
AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory(
registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper());
List<EquivalentAddressGroup> servers =
Arrays.asList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()),
new EquivalentAddressGroup(new SocketAddress(){}));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, null);
assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider);
assertThat(selection.policySelection.config).isNull();
verify(channelLogger).log(
eq(ChannelLogLevel.ERROR),
startsWith("Found balancer addresses but grpclb runtime is missing"));
// Called for the second time, the warning is only logged once
selection = lb.resolveLoadBalancerProvider(servers, null);
assertThat(selection.policySelection.provider).isSameInstanceAs(fakeRoundRobinProvider);
assertThat(selection.policySelection.config).isNull();
// Balancer addresses are filtered out in the server list passed to round_robin
assertThat(selection.serverList).containsExactly(servers.get(1));
verifyNoMoreInteractions(channelLogger);;
}
@Test
public void decideLoadBalancerProvider_grpclbProviderNotFound_noBackendAddress()
throws Exception {
LoadBalancerRegistry registry = new LoadBalancerRegistry();
registry.register(new PickFirstLoadBalancerProvider());
registry.register(new FakeLoadBalancerProvider("round_robin", testLbBalancer, null));
AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory(
registry, GrpcUtil.DEFAULT_LB_POLICY).newLoadBalancer(new TestHelper());
List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
try {
lb.resolveLoadBalancerProvider(servers, null);
fail("Should throw");
} catch (PolicyException e) {
assertThat(e)
.hasMessageThat()
.isEqualTo("Received ONLY balancer addresses but grpclb runtime is missing");
}
}
@Test
public void decideLoadBalancerProvider_serviceConfigLbConfigOverridesDefault() throws Exception {
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(new TestHelper());
Map<String, ?> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"round_robin\": {} } ] }");
ConfigOrError lbConfigs = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(lbConfigs.getConfig()).isNotNull();
PolicySelection policySelection = (PolicySelection) lbConfigs.getConfig();
assertThat(((PolicySelection) lbConfigs.getConfig()).provider.getClass().getName())
.isEqualTo("io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
final List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
assertThat(args.getAddresses()).isEqualTo(servers);
return new TestSubchannel(args);
}
};
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper);
Status handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setLoadBalancingPolicyConfig(lbConfigs.getConfig())
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate().getClass().getName())
.isEqualTo("io.grpc.util.RoundRobinLoadBalancer");
}
@Test
public void handleResolvedAddressGroups_noLbPolicySelected_defaultToPickFirst() {
final List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
assertThat(args.getAddresses()).isEqualTo(servers);
return new TestSubchannel(args);
}
};
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper);
Status handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setLoadBalancingPolicyConfig(null)
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate()).isInstanceOf(PickFirstLoadBalancer.class);
}
@Test
public void handleResolvedAddressGroups_noLbPolicySelected_defaultToCustomDefault() {
AutoConfiguredLoadBalancer lb = new AutoConfiguredLoadBalancerFactory("test_lb")
.newLoadBalancer(new TestHelper());
List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
ResolvedPolicySelection selection = lb.resolveLoadBalancerProvider(servers, policySelection);
assertThat(selection.policySelection.provider.getClass().getName()).isEqualTo(
"io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider");
verifyZeroInteractions(channelLogger);
Status handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setLoadBalancingPolicyConfig(null)
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate()).isSameInstanceAs(testLbBalancer);
}
@Test
public void channelTracing_lbPolicyChanged() throws Exception {
final FakeClock clock = new FakeClock();
List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
Helper helper = new TestHelper() {
@Override
@Deprecated
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
return new TestSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addrs)
.setAttributes(attrs)
.build());
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
return new TestSubchannel(args);
}
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
return mock(ManagedChannel.class, RETURNS_DEEP_STUBS);
}
@Override
public String getAuthority() {
return "fake_authority";
}
@Override
public SynchronizationContext getSynchronizationContext() {
return new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return clock.getScheduledExecutorService();
}
};
AutoConfiguredLoadBalancer lb =
@ -705,23 +514,6 @@ public class AutoConfiguredLoadBalancerFactoryTest {
eq("Load-balancing config: {0}"),
eq(testLbParsedConfig.getConfig()));
verifyNoMoreInteractions(channelLogger);
servers = Collections.singletonList(new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.newBuilder().set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "ok").build()));
handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(Attributes.EMPTY)
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
verify(channelLogger).log(
eq(ChannelLogLevel.INFO),
eq("Load balancer changed from {0} to {1}"),
eq(testLbBalancer.getClass().getSimpleName()), eq("GrpclbLoadBalancer"));
verifyNoMoreInteractions(channelLogger);
}
@Test
@ -834,6 +626,21 @@ public class AutoConfiguredLoadBalancerFactoryTest {
eq(new ArrayList<>(Collections.singletonList("magic_balancer"))));
}
@Test
public void parseLoadBalancerConfig_lbConfigPropagated() throws Exception {
Map<String, ?> rawServiceConfig =
parseConfig(
"{\"loadBalancingConfig\": ["
+ "{\"grpclb\": {\"childPolicy\": [ {\"pick_first\": {} } ] } }"
+ "] }");
ConfigOrError parsed = lbf.parseLoadBalancerPolicy(rawServiceConfig, channelLogger);
assertThat(parsed).isNotNull();
assertThat(parsed.getConfig()).isNotNull();
PolicySelection policySelection = (PolicySelection) parsed.getConfig();
assertThat(policySelection.config).isNotNull();
assertThat(policySelection.provider).isInstanceOf(GrpclbLoadBalancerProvider.class);
verifyZeroInteractions(channelLogger);
}
public static class ForwardingLoadBalancer extends LoadBalancer {
private final LoadBalancer delegate;

View File

@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.net.InetAddresses;
import com.google.common.testing.FakeTicker;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.EquivalentAddressGroup;
import io.grpc.HttpConnectProxiedSocketAddress;
@ -46,7 +47,6 @@ import io.grpc.NameResolver.ServiceConfigParser;
import io.grpc.ProxyDetector;
import io.grpc.StaticTestingClassLoader;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.DnsNameResolver.AddressResolver;
import io.grpc.internal.DnsNameResolver.ResolutionResults;
@ -154,7 +154,8 @@ public class DnsNameResolverTest {
private DnsNameResolver newResolver(String name, int defaultPort, boolean isAndroid) {
return newResolver(
name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(), isAndroid);
name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(),
isAndroid, false);
}
private DnsNameResolver newResolver(
@ -162,7 +163,7 @@ public class DnsNameResolverTest {
int defaultPort,
ProxyDetector proxyDetector,
Stopwatch stopwatch) {
return newResolver(name, defaultPort, proxyDetector, stopwatch, false);
return newResolver(name, defaultPort, proxyDetector, stopwatch, false, false);
}
private DnsNameResolver newResolver(
@ -170,7 +171,8 @@ public class DnsNameResolverTest {
final int defaultPort,
final ProxyDetector proxyDetector,
Stopwatch stopwatch,
boolean isAndroid) {
boolean isAndroid,
boolean enableSrv) {
NameResolver.Args args =
NameResolver.Args.newBuilder()
.setDefaultPort(defaultPort)
@ -179,19 +181,34 @@ public class DnsNameResolverTest {
.setServiceConfigParser(mock(ServiceConfigParser.class))
.setChannelLogger(mock(ChannelLogger.class))
.build();
return newResolver(name, stopwatch, isAndroid, args);
return newResolver(name, stopwatch, isAndroid, args, enableSrv);
}
private DnsNameResolver newResolver(
String name, Stopwatch stopwatch, boolean isAndroid, NameResolver.Args args) {
return newResolver(name, stopwatch, isAndroid, args, /* enableSrv= */ false);
}
private DnsNameResolver newResolver(
String name,
Stopwatch stopwatch,
boolean isAndroid,
NameResolver.Args args,
boolean enableSrv) {
DnsNameResolver dnsResolver =
new DnsNameResolver(
null, name, args, fakeExecutorResource, stopwatch, isAndroid, /* enableSrv= */ false);
null, name, args, fakeExecutorResource, stopwatch, isAndroid, enableSrv);
// By default, using the mocked ResourceResolver to avoid I/O
dnsResolver.setResourceResolver(new JndiResourceResolver(recordFetcher));
return dnsResolver;
}
private DnsNameResolver newSrvEnabledResolver(String name, int defaultPort) {
return newResolver(
name, defaultPort, GrpcUtil.NOOP_PROXY_DETECTOR, Stopwatch.createUnstarted(),
false, true);
}
@Before
public void setUp() {
DnsNameResolver.enableJndi = true;
@ -363,26 +380,6 @@ public class DnsNameResolverTest {
assertThat(executions.get()).isEqualTo(1);
}
@Test
public void resolveAll_failsOnEmptyResult() {
DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443);
nr.setAddressResolver(new AddressResolver() {
@Override
public List<InetAddress> resolveAddress(String host) throws Exception {
return Collections.emptyList();
}
});
nr.start(mockListener);
assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
ArgumentCaptor<Status> ac = ArgumentCaptor.forClass(Status.class);
verify(mockListener).onError(ac.capture());
verifyNoMoreInteractions(mockListener);
assertThat(ac.getValue().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(ac.getValue().getDescription()).contains("No DNS backend or balancer addresses");
}
@Test
public void resolve_cacheForever() throws Exception {
System.setProperty(DnsNameResolver.NETWORKADDRESS_CACHE_TTL_PROPERTY, "-1");
@ -531,6 +528,75 @@ public class DnsNameResolverTest {
verify(mockResolver, times(2)).resolveAddress(anyString());
}
@Test
public void resolve_emptyResult() {
DnsNameResolver nr = newResolver("dns:///addr.fake:1234", 443);
nr.setAddressResolver(new AddressResolver() {
@Override
public List<InetAddress> resolveAddress(String host) throws Exception {
return Collections.emptyList();
}
});
nr.setResourceResolver(new ResourceResolver() {
@Override
public List<String> resolveTxt(String host) throws Exception {
return Collections.emptyList();
}
@Override
public List<EquivalentAddressGroup> resolveSrv(AddressResolver addressResolver, String host)
throws Exception {
return Collections.emptyList();
}
});
nr.start(mockListener);
assertThat(fakeExecutor.runDueTasks()).isEqualTo(1);
ArgumentCaptor<ResolutionResult> ac = ArgumentCaptor.forClass(ResolutionResult.class);
verify(mockListener).onResult(ac.capture());
verifyNoMoreInteractions(mockListener);
assertThat(ac.getValue().getAddresses()).isEmpty();
assertThat(ac.getValue().getAttributes()).isEqualTo(Attributes.EMPTY);
assertThat(ac.getValue().getServiceConfig()).isNull();
}
@SuppressWarnings("deprecation")
@Test
public void resolve_balancerAddrsAsAttributes() throws Exception {
InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0});
final EquivalentAddressGroup balancerAddr =
new EquivalentAddressGroup(
new SocketAddress() {},
Attributes.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, "foo.example.com")
.build());
String name = "foo.googleapis.com";
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString()))
.thenReturn(Collections.singletonList(backendAddr));
ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
when(mockResourceResolver.resolveTxt(anyString())).thenReturn(Collections.<String>emptyList());
when(mockResourceResolver.resolveSrv(ArgumentMatchers.any(AddressResolver.class), anyString()))
.thenReturn(Collections.singletonList(balancerAddr));
DnsNameResolver resolver = newSrvEnabledResolver(name, 81);
resolver.setAddressResolver(mockAddressResolver);
resolver.setResourceResolver(mockResourceResolver);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
InetSocketAddress resolvedBackendAddr =
(InetSocketAddress) Iterables.getOnlyElement(
Iterables.getOnlyElement(result.getAddresses()).getAddresses());
assertThat(resolvedBackendAddr.getAddress()).isEqualTo(backendAddr);
assertThat(result.getAttributes().get(GrpcAttributes.ATTR_LB_ADDRS))
.containsExactly(balancerAddr);
}
@Test
public void resolveAll_nullResourceResolver() throws Exception {
final String hostname = "addr.fake";

View File

@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.internal.DnsNameResolver.AddressResolver;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.JndiResourceResolverFactory.JndiRecordFetcher;
import io.grpc.internal.JndiResourceResolverFactory.JndiResourceResolver;
import io.grpc.internal.JndiResourceResolverFactory.RecordFetcher;
@ -81,6 +80,7 @@ public class JndiResourceResolverTest {
assertThat(resolver.resolveTxt("service.example.com")).isEqualTo(golden);
}
@SuppressWarnings("deprecation")
@Test
public void srvRecordLookup() throws Exception {
AddressResolver addressResolver = mock(AddressResolver.class);

View File

@ -20,6 +20,7 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.ExperimentalApi;
import io.grpc.Metadata;
import java.util.List;
/**
* Constants for the GRPCLB load-balancer.
@ -41,5 +42,15 @@ public final class GrpclbConstants {
static final Attributes.Key<String> TOKEN_ATTRIBUTE_KEY =
Attributes.Key.create("lb-token");
@SuppressWarnings("deprecation")
@EquivalentAddressGroup.Attr
static final Attributes.Key<List<EquivalentAddressGroup>> ATTR_LB_ADDRS =
io.grpc.internal.GrpcAttributes.ATTR_LB_ADDRS;
@SuppressWarnings("deprecation")
@EquivalentAddressGroup.Attr
static final Attributes.Key<String> ATTR_LB_ADDR_AUTHORITY =
io.grpc.internal.GrpcAttributes.ATTR_LB_ADDR_AUTHORITY;
private GrpclbConstants() { }
}

View File

@ -91,22 +91,30 @@ class GrpclbLoadBalancer extends LoadBalancer {
@Override
@SuppressWarnings("deprecation") // TODO(creamsoup) migrate to use parsed service config
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> updatedServers = resolvedAddresses.getAddresses();
Attributes attributes = resolvedAddresses.getAttributes();
// LB addresses and backend addresses are treated separately
List<EquivalentAddressGroup> newLbAddresses = attributes.get(GrpcAttributes.ATTR_LB_ADDRS);
if ((newLbAddresses == null || newLbAddresses.isEmpty())
&& resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(
Status.UNAVAILABLE.withDescription("No backend or balancer addresses found"));
return;
}
List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<>();
for (EquivalentAddressGroup server : updatedServers) {
String lbAddrAuthority = server.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority != null) {
newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority));
} else {
newBackendServers.add(server);
if (newLbAddresses != null) {
for (EquivalentAddressGroup lbAddr : newLbAddresses) {
String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority == null) {
throw new AssertionError(
"This is a bug: LB address " + lbAddr + " does not have an authority.");
}
newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority));
}
}
newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
newBackendServers = Collections.unmodifiableList(newBackendServers);
List<EquivalentAddressGroup> newBackendServers =
Collections.unmodifiableList(resolvedAddresses.getAddresses());
Map<String, ?> rawLbConfigValue = attributes.get(ATTR_LOAD_BALANCING_CONFIG);
Mode newMode = retrieveModeFromLbConfig(rawLbConfigValue, helper.getChannelLogger());
if (!mode.equals(newMode)) {
@ -184,6 +192,11 @@ class GrpclbLoadBalancer extends LoadBalancer {
}
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
@VisibleForTesting
@Nullable
GrpclbState getGrpclbState() {

View File

@ -794,7 +794,7 @@ final class GrpclbState {
// actually used in the normal case. https://github.com/grpc/grpc-java/issues/4618 should allow
// this to be more obvious.
Attributes attrs = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
.build();
return new LbAddressGroup(flattenEquivalentAddressGroup(eags, attrs), authority);
}

View File

@ -468,9 +468,11 @@ public class GrpclbLoadBalancerTest {
when(args.getHeaders()).thenReturn(headers);
long loadReportIntervalMillis = 1983;
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
// Fallback timer is started as soon as address is resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
@ -693,9 +695,11 @@ public class GrpclbLoadBalancerTest {
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
@ -731,9 +735,11 @@ public class GrpclbLoadBalancerTest {
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
@ -785,6 +791,20 @@ public class GrpclbLoadBalancerTest {
.build()));
}
@Test
public void receiveNoBackendAndBalancerAddress() {
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
Collections.<EquivalentAddressGroup>emptyList(),
Attributes.EMPTY);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus();
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found");
}
@Test
public void nameResolutionFailsThenRecover() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
@ -803,11 +823,12 @@ public class GrpclbLoadBalancerTest {
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
// Recover with a subsequent success
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
EquivalentAddressGroup eag = resolvedServers.get(0);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
EquivalentAddressGroup eag = grpclbBalancerList.get(0);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList, resolutionAttrs);
verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -817,11 +838,13 @@ public class GrpclbLoadBalancerTest {
public void grpclbThenNameResolutionFails() {
InOrder inOrder = inOrder(helper, subchannelPool);
// Go to GRPCLB first
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -854,59 +877,63 @@ public class GrpclbLoadBalancerTest {
@Test
public void grpclbUpdatedAddresses_avoidsReconnect() {
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false);
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs);
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
List<EquivalentAddressGroup> grpclbResolutionList2 =
createResolvedServerAddresses(true, false, true);
List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList2 = createResolvedBalancerAddresses(2);
EquivalentAddressGroup combinedEag = new EquivalentAddressGroup(Arrays.asList(
grpclbResolutionList2.get(0).getAddresses().get(0),
grpclbResolutionList2.get(2).getAddresses().get(0)),
grpclbBalancerList2.get(0).getAddresses().get(0),
grpclbBalancerList2.get(1).getAddresses().get(0)),
lbAttributes(lbAuthority(0)));
deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList2, grpclbBalancerList2, grpclbResolutionAttrs);
verify(helper).updateOobChannelAddresses(eq(oobChannel), eq(combinedEag));
assertEquals(1, lbRequestObservers.size()); // No additional RPC
}
@Test
public void grpclbUpdatedAddresses_reconnectOnAuthorityChange() {
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, false);
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs);
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
ManagedChannel oobChannel = fakeOobChannels.poll();
assertEquals(1, lbRequestObservers.size());
final String newAuthority = "some-new-authority";
List<EquivalentAddressGroup> grpclbResolutionList2 =
createResolvedServerAddresses(false);
grpclbResolutionList2.add(new EquivalentAddressGroup(
List<EquivalentAddressGroup> backendList2 = createResolvedBackendAddresses(1);
List<EquivalentAddressGroup> grpclbBalancerList2 =
Collections.singletonList(
new EquivalentAddressGroup(
new FakeSocketAddress("somethingNew"), lbAttributes(newAuthority)));
deliverResolvedAddresses(grpclbResolutionList2, grpclbResolutionAttrs);
deliverResolvedAddresses(
backendList2, grpclbBalancerList2, grpclbResolutionAttrs);
assertTrue(oobChannel.isTerminated());
verify(helper).createOobChannel(eq(grpclbResolutionList2.get(1)), eq(newAuthority));
verify(helper).createOobChannel(eq(grpclbBalancerList2.get(0)), eq(newAuthority));
assertEquals(2, lbRequestObservers.size()); // An additional RPC
}
@Test
public void grpclbWorking() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -1174,13 +1201,14 @@ public class GrpclbLoadBalancerTest {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, subchannelPool);
// Create a resolution list with a mixture of balancer and backend addresses
List<EquivalentAddressGroup> resolutionList =
createResolvedServerAddresses(false, true, false);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs);
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
inOrder.verify(helper)
.createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
@ -1214,11 +1242,11 @@ public class GrpclbLoadBalancerTest {
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
List<EquivalentAddressGroup> fallbackList =
Arrays.asList(resolutionList.get(0), resolutionList.get(2));
Arrays.asList(backendList.get(0), backendList.get(1));
assertThat(logs).containsExactly(
"INFO: Using fallback backends",
"INFO: Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], "
+ "[[FakeSocketAddress-fake-address-2]/{}]], drop=[null, null]",
+ "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]",
"INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder();
// Fall back to the backends from resolver
@ -1228,19 +1256,20 @@ public class GrpclbLoadBalancerTest {
verify(lbRequestObserver, never()).onCompleted();
}
////////////////////////////////////////////////////////
// Name resolver sends new list without any backend addr
////////////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, true);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
//////////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results without any backend addr
//////////////////////////////////////////////////////////////////////
grpclbBalancerList = createResolvedBalancerAddresses(2);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),grpclbBalancerList, resolutionAttrs);
// New addresses are updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
same(oobChannel),
eq(new EquivalentAddressGroup(
Arrays.asList(
resolutionList.get(0).getAddresses().get(0),
resolutionList.get(1).getAddresses().get(0)),
grpclbBalancerList.get(0).getAddresses().get(0),
grpclbBalancerList.get(1).getAddresses().get(0)),
lbAttributes(lbAuthority(0)))));
if (timerExpires) {
@ -1249,21 +1278,22 @@ public class GrpclbLoadBalancerTest {
inOrder, Collections.<EquivalentAddressGroup>emptyList());
}
//////////////////////////////////////////////////
// Name resolver sends new list with backend addrs
//////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, false, false);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
////////////////////////////////////////////////////////////////
// Name resolver sends new resolution results with backend addrs
////////////////////////////////////////////////////////////////
backendList = createResolvedBackendAddresses(2);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs);
// New LB address is updated to the OobChannel
inOrder.verify(helper).updateOobChannelAddresses(
same(oobChannel),
eq(resolutionList.get(0)));
eq(grpclbBalancerList.get(0)));
if (timerExpires) {
// New backend addresses are used for fallback
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
}
////////////////////////////////////////////////
@ -1302,8 +1332,9 @@ public class GrpclbLoadBalancerTest {
///////////////////////////////////////////////////////////////
// New backend addresses from resolver outside of fallback mode
///////////////////////////////////////////////////////////////
resolutionList = createResolvedServerAddresses(true, false);
deliverResolvedAddresses(resolutionList, resolutionAttrs);
backendList = createResolvedBackendAddresses(1);
grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs);
// Will not affect the round robin list at all
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
@ -1317,13 +1348,13 @@ public class GrpclbLoadBalancerTest {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, subchannelPool);
// Create a resolution list with a mixture of balancer and backend addresses
List<EquivalentAddressGroup> resolutionList =
createResolvedServerAddresses(false, true, false);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs);
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
// Attempted to connect to balancer
assertThat(fakeOobChannels).hasSize(1);
@ -1353,7 +1384,7 @@ public class GrpclbLoadBalancerTest {
// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
@ -1369,10 +1400,11 @@ public class GrpclbLoadBalancerTest {
public void grpclbFallback_noBalancerAddress() {
InOrder inOrder = inOrder(helper, subchannelPool);
// Create a resolution list with just backend addresses
List<EquivalentAddressGroup> resolutionList = createResolvedServerAddresses(false, false);
// Create just backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
deliverResolvedAddresses(
backendList, Collections.<EquivalentAddressGroup>emptyList(), resolutionAttrs);
assertThat(logs).containsExactly(
"INFO: Using fallback backends",
@ -1381,7 +1413,7 @@ public class GrpclbLoadBalancerTest {
"INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder();
// Fall back to the backends from resolver
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, resolutionList);
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
// No fallback timeout timer scheduled.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
@ -1410,13 +1442,13 @@ public class GrpclbLoadBalancerTest {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, mockLbService, subchannelPool);
// Create a resolution list with a mixture of balancer and backend addresses
List<EquivalentAddressGroup> resolutionList =
createResolvedServerAddresses(false, true, false);
// Create balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolutionList, resolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, resolutionAttrs);
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
inOrder.verify(helper).createOobChannel(eq(grpclbBalancerList.get(0)), eq(lbAuthority(0)));
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
@ -1465,7 +1497,7 @@ public class GrpclbLoadBalancerTest {
if (balancerBroken && allSubchannelsBroken) {
// Going into fallback
subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
inOrder, Arrays.asList(backendList.get(0), backendList.get(1)));
// When in fallback mode, fallback timer should not be scheduled when all backend
// connections are lost
@ -1486,9 +1518,9 @@ public class GrpclbLoadBalancerTest {
if (!(balancerBroken && allSubchannelsBroken)) {
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(resolutionList.get(0)), any(Attributes.class));
eq(backendList.get(0)), any(Attributes.class));
verify(subchannelPool, never()).takeOrCreateSubchannel(
eq(resolutionList.get(2)), any(Attributes.class));
eq(backendList.get(1)), any(Attributes.class));
}
}
@ -1555,15 +1587,15 @@ public class GrpclbLoadBalancerTest {
@Test
public void grpclbMultipleAuthorities() throws Exception {
List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList(
List<EquivalentAddressGroup> backendList = Collections.singletonList(
new EquivalentAddressGroup(new FakeSocketAddress("not-a-lb-address")));
List<EquivalentAddressGroup> grpclbBalancerList = Arrays.asList(
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-1"),
lbAttributes("fake-authority-1")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-2"),
lbAttributes("fake-authority-2")),
new EquivalentAddressGroup(
new FakeSocketAddress("not-a-lb-address")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-3"),
lbAttributes("fake-authority-1")));
@ -1574,7 +1606,7 @@ public class GrpclbLoadBalancerTest {
lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs);
verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
}
@ -1588,9 +1620,11 @@ public class GrpclbLoadBalancerTest {
.build();
InOrder inOrder =
inOrder(mockLbService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2, helper);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
@SuppressWarnings("unused")
@ -1693,11 +1727,13 @@ public class GrpclbLoadBalancerTest {
InOrder inOrder = inOrder(helper);
String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}";
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -1825,10 +1861,12 @@ public class GrpclbLoadBalancerTest {
@SuppressWarnings("deprecation") // TODO(creamsoup) use parsed object
private void subtestShutdownWithoutSubchannel(String childPolicy) throws Exception {
String lbConfig = "{\"childPolicy\" : [ {\"" + childPolicy + "\" : {}} ]}";
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> requestObserver = lbRequestObservers.poll();
@ -1848,12 +1886,12 @@ public class GrpclbLoadBalancerTest {
String lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}";
// Name resolver returns a mix of balancer and backend addresses
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(false, true, false);
// Name resolver returns balancer and backend addresses
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs);
// Attempted to connect to balancer
assertEquals(1, fakeOobChannels.size());
@ -1868,7 +1906,7 @@ public class GrpclbLoadBalancerTest {
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eq(Arrays.asList(grpclbResolutionList.get(0), grpclbResolutionList.get(2))),
eq(Arrays.asList(backendList.get(0), backendList.get(1))),
any(Attributes.class));
assertThat(mockSubchannels).hasSize(1);
@ -1927,11 +1965,13 @@ public class GrpclbLoadBalancerTest {
InOrder inOrder = inOrder(helper);
String lbConfig = "{\"childPolicy\" : [ {\"round_robin\" : {}} ]}";
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
Attributes grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
@ -1971,7 +2011,9 @@ public class GrpclbLoadBalancerTest {
lbConfig = "{\"childPolicy\" : [ {\"pick_first\" : {}} ]}";
grpclbResolutionAttrs = Attributes.newBuilder().set(
LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, parseJsonObject(lbConfig)).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList, grpclbResolutionAttrs);
// GrpclbState will be shutdown, and a new one will be created
@ -2088,19 +2130,18 @@ public class GrpclbLoadBalancerTest {
@Test
public void grpclbWorking_lbSendsFallbackMessage() {
InOrder inOrder = inOrder(helper, subchannelPool);
List<EquivalentAddressGroup> grpclbResolutionList =
createResolvedServerAddresses(true, true, false, false);
List<EquivalentAddressGroup> fallbackEags = grpclbResolutionList.subList(2, 4);
List<EquivalentAddressGroup> backendList = createResolvedBackendAddresses(2);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(2);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
deliverResolvedAddresses(backendList, grpclbBalancerList, grpclbResolutionAttrs);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
List<SocketAddress> addrs = new ArrayList<>();
addrs.addAll(grpclbResolutionList.get(0).getAddresses());
addrs.addAll(grpclbResolutionList.get(1).getAddresses());
Attributes attr = grpclbResolutionList.get(0).getAttributes();
addrs.addAll(grpclbBalancerList.get(0).getAddresses());
addrs.addAll(grpclbBalancerList.get(1).getAddresses());
Attributes attr = grpclbBalancerList.get(0).getAttributes();
EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr);
verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
@ -2206,7 +2247,7 @@ public class GrpclbLoadBalancerTest {
.returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY)));
// verify fallback
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags);
fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList);
assertFalse(oobChannel.isShutdown());
verify(lbRequestObserver, never()).onCompleted();
@ -2312,12 +2353,21 @@ public class GrpclbLoadBalancerTest {
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> addrs, final Attributes attrs) {
final List<EquivalentAddressGroup> backendAddrs,
final List<EquivalentAddressGroup> balancerAddrs,
Attributes attrs) {
if (!balancerAddrs.isEmpty()) {
attrs = attrs.toBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
}
final Attributes finalAttrs = attrs;
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(addrs).setAttributes(attrs).build());
ResolvedAddresses.newBuilder()
.setAddresses(backendAddrs)
.setAttributes(finalAttrs)
.build());
}
});
}
@ -2326,15 +2376,20 @@ public class GrpclbLoadBalancerTest {
return balancer.getGrpclbState().getLoadRecorder();
}
private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) {
ArrayList<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < isLb.length; i++) {
private static List<EquivalentAddressGroup> createResolvedBackendAddresses(int n) {
List<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
EquivalentAddressGroup eag =
new EquivalentAddressGroup(
addr,
isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY);
list.add(eag);
list.add(new EquivalentAddressGroup(addr));
}
return list;
}
private static List<EquivalentAddressGroup> createResolvedBalancerAddresses(int n) {
List<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < n; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
list.add(new EquivalentAddressGroup(addr, lbAttributes(lbAuthority(i))));
}
return list;
}
@ -2346,7 +2401,7 @@ public class GrpclbLoadBalancerTest {
private static Attributes lbAttributes(String authority) {
return Attributes.newBuilder()
.set(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY, authority)
.set(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY, authority)
.build();
}

View File

@ -61,6 +61,7 @@ final class FallbackLb extends ForwardingLoadBalancer {
return fallbackPolicyLb;
}
@SuppressWarnings("deprecation")
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
@ -113,6 +114,8 @@ final class FallbackLb extends ForwardingLoadBalancer {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
// Some addresses in the list may be grpclb-v1 balancer addresses, so if the fallback policy
// does not support grpclb-v1 balancer addresses, then we need to exclude them from the list.
// TODO(chengyuanzhang): delete the following logic after changing internal resolver
// to not include grpclb server addresses.
if (!newFallbackPolicyName.equals("grpclb") && !newFallbackPolicyName.equals(XDS_POLICY_NAME)) {
ImmutableList.Builder<EquivalentAddressGroup> backends = ImmutableList.builder();
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {