xds: eliminate special code path for EDS-only workflow (#6931)

Delete special logics (e.g., fallback) for EDS-only workflow and use the same format of lb config for running EDS-only workflow as running the full CDS-EDS workflow.
This commit is contained in:
Chengyuan Zhang 2020-04-17 17:42:47 +00:00 committed by GitHub
parent 52a72e2dcd
commit 03db20cded
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 258 additions and 1449 deletions

View File

@ -16,7 +16,6 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
@ -57,13 +56,12 @@ final class EdsLoadBalancer extends LoadBalancer {
private final InternalLogId logId;
private final XdsLogger logger;
private final ResourceUpdateCallback resourceUpdateCallback;
private final GracefulSwitchLoadBalancer switchingLoadBalancer;
private final LoadBalancerRegistry lbRegistry;
private final LocalityStoreFactory localityStoreFactory;
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final Helper edsLbHelper;
private final Helper helper;
@Nullable
private ObjectPool<XdsClient> xdsClientPool;
@ -72,12 +70,11 @@ final class EdsLoadBalancer extends LoadBalancer {
@Nullable
private String clusterName;
@Nullable
private EdsConfig config;
private String edsServiceName;
EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) {
EdsLoadBalancer(Helper helper) {
this(
edsLbHelper,
resourceUpdateCallback,
helper,
LoadBalancerRegistry.getDefaultRegistry(),
LocalityStoreFactory.getInstance(),
Bootstrapper.getInstance(),
@ -86,20 +83,18 @@ final class EdsLoadBalancer extends LoadBalancer {
@VisibleForTesting
EdsLoadBalancer(
Helper edsLbHelper,
ResourceUpdateCallback resourceUpdateCallback,
Helper helper,
LoadBalancerRegistry lbRegistry,
LocalityStoreFactory localityStoreFactory,
Bootstrapper bootstrapper,
XdsChannelFactory channelFactory) {
this.edsLbHelper = checkNotNull(edsLbHelper, "edsLbHelper");
this.resourceUpdateCallback = checkNotNull(resourceUpdateCallback, "resourceUpdateCallback");
this.helper = checkNotNull(helper, "helper");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.localityStoreFactory = checkNotNull(localityStoreFactory, "localityStoreFactory");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(edsLbHelper);
logId = InternalLogId.allocate("eds-lb", edsLbHelper.getAuthority());
this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper);
logId = InternalLogId.allocate("eds-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
}
@ -108,7 +103,10 @@ final class EdsLoadBalancer extends LoadBalancer {
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
checkNotNull(lbConfig, "missing EDS lb config");
if (lbConfig == null) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription("Missing EDS lb config"));
return;
}
EdsConfig newEdsConfig = (EdsConfig) lbConfig;
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
@ -120,34 +118,20 @@ final class EdsLoadBalancer extends LoadBalancer {
newEdsConfig.edsServiceName,
newEdsConfig.lrsServerName != null);
}
boolean firstUpdate = false;
if (clusterName == null) {
clusterName = newEdsConfig.clusterName;
} else {
checkArgument(
clusterName.equals(newEdsConfig.clusterName),
"cluster name should not change");
firstUpdate = true;
}
clusterName = newEdsConfig.clusterName;
if (xdsClientPool == null) {
// Init xdsClientPool and xdsClient.
// There are two usecases:
// 1. EDS-only:
// The name resolver resolves a ResolvedAddresses with an XdsConfig. Use the bootstrap
// information to create a channel.
// 2. Non EDS-only:
// XDS_CLIENT_POOL attribute is available from ResolvedAddresses either from
// XdsNameResolver or CDS policy.
//
// We assume XdsConfig switching happens only within one usecase, and there is no switching
// between different usecases.
Attributes attributes = resolvedAddresses.getAttributes();
xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
if (xdsClientPool == null) { // This is the EDS-only usecase.
if (xdsClientPool == null) {
final BootstrapInfo bootstrapInfo;
try {
bootstrapInfo = bootstrapper.readBootstrap();
} catch (Exception e) {
edsLbHelper.updateBalancingState(
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription("Failed to bootstrap").withCause(e)));
@ -157,7 +141,7 @@ final class EdsLoadBalancer extends LoadBalancer {
final List<ServerInfo> serverList = bootstrapInfo.getServers();
final Node node = bootstrapInfo.getNode();
if (serverList.isEmpty()) {
edsLbHelper.updateBalancingState(
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE
@ -169,12 +153,12 @@ final class EdsLoadBalancer extends LoadBalancer {
XdsClient createXdsClient() {
return
new XdsClientImpl(
edsLbHelper.getAuthority(),
helper.getAuthority(),
serverList,
channelFactory,
node,
edsLbHelper.getSynchronizationContext(),
edsLbHelper.getScheduledExecutorService(),
helper.getSynchronizationContext(),
helper.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
}
@ -188,14 +172,13 @@ final class EdsLoadBalancer extends LoadBalancer {
// Note: childPolicy change will be handled in LocalityStore, to be implemented.
// If edsServiceName in XdsConfig is changed, do a graceful switch.
if (config == null
|| !Objects.equals(newEdsConfig.edsServiceName, config.edsServiceName)) {
if (firstUpdate || !Objects.equals(newEdsConfig.edsServiceName, edsServiceName)) {
LoadBalancer.Factory clusterEndpointsLoadBalancerFactory =
new ClusterEndpointsBalancerFactory(newEdsConfig.edsServiceName);
switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancerFactory);
}
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
this.config = newEdsConfig;
this.edsServiceName = newEdsConfig.edsServiceName;
}
@Override
@ -267,7 +250,7 @@ final class EdsLoadBalancer extends LoadBalancer {
resourceName = clusterServiceName != null ? clusterServiceName : clusterName;
localityStore =
localityStoreFactory.newLocalityStore(logId, helper, lbRegistry, loadStatsStore);
endpointWatcher = new EndpointWatcherImpl(localityStore);
endpointWatcher = new EndpointWatcherImpl();
logger.log(
XdsLogLevel.INFO,
"Start endpoint watcher on {0} with xDS client {1}",
@ -308,9 +291,7 @@ final class EdsLoadBalancer extends LoadBalancer {
@Override
public void handleNameResolutionError(Status error) {
// Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise,
// we keep running with the data we had previously.
if (!endpointWatcher.firstEndpointUpdateReceived) {
if (!endpointWatcher.endpointsReceived) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@ -339,89 +320,60 @@ final class EdsLoadBalancer extends LoadBalancer {
resourceName,
xdsClient);
}
}
}
/**
* Callbacks for the EDS-only-with-fallback usecase. Being deprecated.
*/
interface ResourceUpdateCallback {
private final class EndpointWatcherImpl implements EndpointWatcher {
boolean endpointsReceived;
void onWorking();
@Override
public void onEndpointChanged(EndpointUpdate endpointUpdate) {
logger.log(XdsLogLevel.DEBUG, "Received endpoint update: {0}", endpointUpdate);
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
"Received endpoint update from xDS client {0}: cluster_name={1}, {2} localities, "
+ "{3} drop categories",
xdsClient,
endpointUpdate.getClusterName(),
endpointUpdate.getLocalityLbEndpointsMap().size(),
endpointUpdate.getDropPolicies().size());
}
endpointsReceived = true;
List<DropOverload> dropOverloads = endpointUpdate.getDropPolicies();
ImmutableList.Builder<DropOverload> dropOverloadsBuilder = ImmutableList.builder();
for (DropOverload dropOverload : dropOverloads) {
dropOverloadsBuilder.add(dropOverload);
if (dropOverload.getDropsPerMillion() == 1_000_000) {
break;
}
}
localityStore.updateDropPercentage(dropOverloadsBuilder.build());
void onError();
ImmutableMap.Builder<Locality, LocalityLbEndpoints> localityEndpointsMapping =
new ImmutableMap.Builder<>();
for (Map.Entry<Locality, LocalityLbEndpoints> entry
: endpointUpdate.getLocalityLbEndpointsMap().entrySet()) {
int localityWeight = entry.getValue().getLocalityWeight();
void onAllDrop();
}
private final class EndpointWatcherImpl implements EndpointWatcher {
final LocalityStore localityStore;
boolean firstEndpointUpdateReceived;
EndpointWatcherImpl(LocalityStore localityStore) {
this.localityStore = localityStore;
}
@Override
public void onEndpointChanged(EndpointUpdate endpointUpdate) {
logger.log(XdsLogLevel.DEBUG, "Received endpoint update: {0}", endpointUpdate);
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
"Received endpoint update from xDS client {0}: cluster_name={1}, {2} localities, "
+ "{3} drop categories",
xdsClient,
endpointUpdate.getClusterName(),
endpointUpdate.getLocalityLbEndpointsMap().size(),
endpointUpdate.getDropPolicies().size());
}
if (!firstEndpointUpdateReceived) {
firstEndpointUpdateReceived = true;
resourceUpdateCallback.onWorking();
}
List<DropOverload> dropOverloads = endpointUpdate.getDropPolicies();
ImmutableList.Builder<DropOverload> dropOverloadsBuilder = ImmutableList.builder();
for (DropOverload dropOverload : dropOverloads) {
dropOverloadsBuilder.add(dropOverload);
if (dropOverload.getDropsPerMillion() == 1_000_000) {
resourceUpdateCallback.onAllDrop();
break;
if (localityWeight != 0) {
localityEndpointsMapping.put(entry.getKey(), entry.getValue());
}
}
localityStore.updateLocalityStore(localityEndpointsMapping.build());
}
}
localityStore.updateDropPercentage(dropOverloadsBuilder.build());
ImmutableMap.Builder<Locality, LocalityLbEndpoints> localityEndpointsMapping =
new ImmutableMap.Builder<>();
for (Map.Entry<Locality, LocalityLbEndpoints> entry
: endpointUpdate.getLocalityLbEndpointsMap().entrySet()) {
int localityWeight = entry.getValue().getLocalityWeight();
if (localityWeight != 0) {
localityEndpointsMapping.put(entry.getKey(), entry.getValue());
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}: {2}",
xdsClient,
error.getCode(),
error.getDescription());
if (!endpointsReceived) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
}
localityStore.updateLocalityStore(localityEndpointsMapping.build());
}
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}: {2}",
xdsClient,
error.getCode(),
error.getDescription());
resourceUpdateCallback.onError();
// If we get an error before getting any valid result, we should put the channel in
// TRANSIENT_FAILURE; if they get an error after getting a valid result, we keep using the
// previous channel state.
if (!firstEndpointUpdateReceived) {
edsLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
}
}

View File

@ -20,13 +20,20 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@ -55,24 +62,47 @@ public class EdsLoadBalancerProvider extends LoadBalancerProvider {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new EdsLoadBalancer(
helper,
new ResourceUpdateCallback() {
@Override
public void onWorking() {}
@Override
public void onError() {}
@Override
public void onAllDrop() {}
});
return new EdsLoadBalancer(helper);
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
throw new UnsupportedOperationException();
LoadBalancerRegistry registry = LoadBalancerRegistry.getDefaultRegistry();
try {
String cluster = JsonUtil.getString(rawLoadBalancingPolicyConfig, "cluster");
if (cluster == null) {
return ConfigOrError.fromError(Status.INTERNAL.withDescription("Cluster name required"));
}
String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName");
String lrsServerName =
JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName");
// TODO(chengyuanzhang): figure out locality_picking_policy parsing and its default value.
LbConfig roundRobinConfig = new LbConfig("round_robin", ImmutableMap.<String, Object>of());
List<LbConfig> endpointPickingPolicy =
ServiceConfigUtil
.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(
rawLoadBalancingPolicyConfig, "endpointPickingPolicy"));
if (endpointPickingPolicy == null || endpointPickingPolicy.isEmpty()) {
endpointPickingPolicy = Collections.singletonList(roundRobinConfig);
}
ConfigOrError endpointPickingConfigOrError =
ServiceConfigUtil.selectLbPolicyFromList(endpointPickingPolicy, registry);
if (endpointPickingConfigOrError.getError() != null) {
return endpointPickingConfigOrError;
}
PolicySelection endpointPickingSelection =
(PolicySelection) endpointPickingConfigOrError.getConfig();
return ConfigOrError.fromConfig(
new EdsConfig(cluster, edsServiceName, lrsServerName, endpointPickingSelection));
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.fromThrowable(e).withDescription(
"Failed to parse EDS LB config: " + rawLoadBalancingPolicyConfig));
}
}
static final class EdsConfig {

View File

@ -1,77 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.List;
/** Fallback load balancer. Handles fallback policy changes. */
final class FallbackLb extends ForwardingLoadBalancer {
private final Helper fallbackLbHelper;
private final GracefulSwitchLoadBalancer fallbackPolicyLb;
FallbackLb(Helper fallbackLbHelper) {
this.fallbackLbHelper = checkNotNull(fallbackLbHelper, "fallbackLbHelper");
fallbackPolicyLb = new GracefulSwitchLoadBalancer(fallbackLbHelper);
}
@Override
protected LoadBalancer delegate() {
return fallbackPolicyLb;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
XdsConfig xdsConfig = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
PolicySelection fallbackPolicy = xdsConfig.fallbackPolicy;
if (fallbackPolicy == null) {
// In the latest xDS design, fallback is not supported.
fallbackLbHelper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(Status.UNAVAILABLE.withDescription("Fallback is not supported")));
return;
}
fallbackPolicyLb.switchTo(fallbackPolicy.getProvider());
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
// TODO(zhangkun83): FIXME(#5496): this is a temporary hack.
if (servers.isEmpty()
&& !fallbackPolicyLb.canHandleEmptyAddressListFromNameResolution()) {
fallbackPolicyLb.handleNameResolutionError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no usable address."
+ " addrs=" + resolvedAddresses));
} else {
// TODO(carl-mastrangelo): propagate the load balancing config policy
ResolvedAddresses fallbackResolvedAddresses = resolvedAddresses.toBuilder()
.setAddresses(servers)
.setLoadBalancingPolicyConfig(fallbackPolicy.getConfig())
.build();
fallbackPolicyLb.handleResolvedAddresses(fallbackResolvedAddresses);
}
}
}

View File

@ -1,284 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
/**
* A {@link LoadBalancer} that uses the XDS protocol.
*
* <p>This class manages fallback handling. The logic for child policy handling and fallback policy
* handling is provided by EdsLoadBalancer and FallbackLb.
*/
final class XdsLoadBalancer extends LoadBalancer {
private static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); // same as grpclb
private final Helper helper;
private final LoadBalancer primaryLb;
private final LoadBalancer.Factory fallbackLbFactory;
private final ResourceUpdateCallback resourceUpdateCallback = new ResourceUpdateCallback() {
@Override
public void onWorking() {
if (primaryPolicyHasBeenReady) {
// cancel Fallback-After-Startup timer if there's any
cancelFallbackTimer();
}
primaryPolicyWorked = true;
}
@Override
public void onError() {
if (!primaryPolicyWorked) {
// start Fallback-at-Startup immediately
useFallbackPolicy();
} else if (primaryPolicyHasBeenReady) {
// TODO: schedule a timer for Fallback-After-Startup
} // else: the Fallback-at-Startup timer is still pending, noop and wait
}
@Override
public void onAllDrop() {
cancelFallback();
}
};
@Nullable
private LoadBalancer fallbackLb;
@Nullable
private ResolvedAddresses resolvedAddresses;
// Scheduled only once. Never reset to null.
@CheckForNull
private ScheduledHandle fallbackTimer;
private boolean primaryPolicyWorked;
private boolean primaryPolicyHasBeenReady;
XdsLoadBalancer(Helper helper) {
this(helper, new EdsLoadBalancerFactory(), new FallbackLbFactory());
}
@VisibleForTesting
XdsLoadBalancer(
Helper helper,
PrimaryLbFactory primaryLbFactory,
LoadBalancer.Factory fallbackLbFactory) {
this.helper = checkNotNull(helper, "helper");
this.primaryLb = primaryLbFactory.newLoadBalancer(
new PrimaryLbHelper(), resourceUpdateCallback);
this.fallbackLbFactory = fallbackLbFactory;
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
// This does not sound correct, but it's fine as we don't support fallback at this moment.
// TODO(zdapeng): revisit it once we officially support fallback.
return true;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
this.resolvedAddresses = resolvedAddresses;
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
if (lbConfig == null) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(Status.UNAVAILABLE.withDescription("Missing xDS lb config")));
return;
}
XdsConfig newXdsConfig = (XdsConfig) lbConfig;
if (isInFallbackMode()) {
fallbackLb.handleResolvedAddresses(this.resolvedAddresses);
}
if (fallbackTimer == null) {
class EnterFallbackTask implements Runnable {
@Override
public void run() {
useFallbackPolicy();
}
}
fallbackTimer = helper.getSynchronizationContext().schedule(
new EnterFallbackTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
}
EdsConfig edsConfig =
new EdsConfig(
helper.getAuthority(),
newXdsConfig.edsServiceName,
newXdsConfig.lrsServerName,
newXdsConfig.childPolicy);
primaryLb.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build());
}
@Override
public void handleNameResolutionError(Status error) {
primaryLb.handleNameResolutionError(error);
if (isInFallbackMode()) {
fallbackLb.handleNameResolutionError(error);
}
}
@Override
public void requestConnection() {
primaryLb.requestConnection();
if (isInFallbackMode()) {
fallbackLb.requestConnection();
}
}
@Override
public void shutdown() {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Shutting down XDS balancer");
primaryLb.shutdown();
cancelFallback();
}
@Deprecated
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
throw new UnsupportedOperationException(
"handleSubchannelState() not supported by XdsLoadBalancer");
}
private void cancelFallbackTimer() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
}
}
private void cancelFallback() {
cancelFallbackTimer();
if (isInFallbackMode()) {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Shutting down XDS fallback balancer");
fallbackLb.shutdown();
fallbackLb = null;
}
}
private void useFallbackPolicy() {
if (isInFallbackMode()) {
return;
}
cancelFallbackTimer();
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Using XDS fallback policy");
FallbackLbHelper fallbackLbHelper = new FallbackLbHelper();
fallbackLb = fallbackLbFactory.newLoadBalancer(fallbackLbHelper);
fallbackLbHelper.balancer = fallbackLb;
fallbackLb.handleResolvedAddresses(resolvedAddresses);
}
/**
* Fallback mode being on indicates that an update from child LBs will be ignored unless the
* update triggers turning off the fallback mode first.
*/
private boolean isInFallbackMode() {
return fallbackLb != null;
}
private final class PrimaryLbHelper extends ForwardingLoadBalancerHelper {
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
if (newState == ConnectivityState.READY) {
checkState(
primaryPolicyWorked,
"channel goes to READY before the load balancer even worked");
primaryPolicyHasBeenReady = true;
cancelFallback();
}
if (!isInFallbackMode()) {
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Picker updated - state: {0}, picker: {1}", newState, newPicker);
helper.updateBalancingState(newState, newPicker);
}
}
}
private final class FallbackLbHelper extends ForwardingLoadBalancerHelper {
LoadBalancer balancer;
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
checkNotNull(balancer, "balancer not set yet");
if (balancer != fallbackLb) {
// ignore updates from a misbehaving shutdown fallback balancer
return;
}
helper.getChannelLogger().log(
ChannelLogLevel.INFO,
"Picker updated - state: {0}, picker: {1}", newState, newPicker);
super.updateBalancingState(newState, newPicker);
}
}
/** Factory of load balancer for the primary policy.*/
// The interface itself is for convenience in test.
@VisibleForTesting
interface PrimaryLbFactory {
LoadBalancer newLoadBalancer(Helper helper, ResourceUpdateCallback resourceUpdateCallback);
}
private static final class EdsLoadBalancerFactory implements PrimaryLbFactory {
@Override
public LoadBalancer newLoadBalancer(
Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) {
return new EdsLoadBalancer(edsLbHelper, resourceUpdateCallback);
}
}
private static final class FallbackLbFactory extends LoadBalancer.Factory {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new FallbackLb(helper);
}
}
}

View File

@ -1,167 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.JsonUtil;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
/**
* The provider for the "xds" balancing policy. This class should not be directly referenced in
* code. The policy should be accessed through {@link io.grpc.LoadBalancerRegistry#getProvider}
* with the name "xds" (currently "xds_experimental").
*/
@Internal
public final class XdsLoadBalancerProvider extends LoadBalancerProvider {
private static final String XDS_POLICY_NAME = "xds_experimental";
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return XDS_POLICY_NAME;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new XdsLoadBalancer(helper);
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return parseLoadBalancingConfigPolicy(
rawLoadBalancingPolicyConfig, LoadBalancerRegistry.getDefaultRegistry());
}
static ConfigOrError parseLoadBalancingConfigPolicy(
Map<String, ?> rawLoadBalancingPolicyConfig, LoadBalancerRegistry registry) {
try {
LbConfig roundRobinConfig = new LbConfig("round_robin", ImmutableMap.<String, Object>of());
List<LbConfig> childPolicyConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(rawLoadBalancingPolicyConfig, "childPolicy"));
if (childPolicyConfigs == null || childPolicyConfigs.isEmpty()) {
childPolicyConfigs = Collections.singletonList(roundRobinConfig);
}
ConfigOrError childConfigOrError =
ServiceConfigUtil.selectLbPolicyFromList(childPolicyConfigs, registry);
if (childConfigOrError.getError() != null) {
return childConfigOrError;
}
PolicySelection childPolicy = (PolicySelection) childConfigOrError.getConfig();
List<LbConfig> fallbackConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(rawLoadBalancingPolicyConfig, "fallbackPolicy"));
if (fallbackConfigs == null || fallbackConfigs.isEmpty()) {
fallbackConfigs = Collections.singletonList(roundRobinConfig);
}
ConfigOrError fallbackConfigOrError =
ServiceConfigUtil.selectLbPolicyFromList(fallbackConfigs, registry);
if (fallbackConfigOrError.getError() != null) {
return fallbackConfigOrError;
}
PolicySelection fallbackPolicy = (PolicySelection) fallbackConfigOrError.getConfig();
String edsServiceName = JsonUtil.getString(rawLoadBalancingPolicyConfig, "edsServiceName");
String lrsServerName =
JsonUtil.getString(rawLoadBalancingPolicyConfig, "lrsLoadReportingServerName");
return ConfigOrError.fromConfig(
new XdsConfig(edsServiceName, lrsServerName, childPolicy, fallbackPolicy));
} catch (RuntimeException e) {
return ConfigOrError.fromError(
Status.fromThrowable(e).withDescription(
"Failed to parse XDS LB config: " + rawLoadBalancingPolicyConfig));
}
}
/**
* Represents a successfully parsed and validated LoadBalancingConfig for XDS LB policy.
*/
static final class XdsConfig {
@Nullable
final String edsServiceName;
@Nullable
final String lrsServerName;
final PolicySelection childPolicy; // default to round_robin if not specified in proto
final PolicySelection fallbackPolicy; // default to round_robin if not specified in proto
XdsConfig(
@Nullable String edsServiceName,
@Nullable String lrsServerName,
PolicySelection childPolicy,
PolicySelection fallbackPolicy) {
this.edsServiceName = edsServiceName;
this.lrsServerName = lrsServerName;
this.childPolicy = checkNotNull(childPolicy, "childPolicy");
this.fallbackPolicy = fallbackPolicy;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("edsServiceName", edsServiceName)
.add("lrsServerName", lrsServerName)
.add("childPolicy", childPolicy)
.add("fallbackPolicy", fallbackPolicy)
.toString();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof XdsConfig)) {
return false;
}
XdsConfig that = (XdsConfig) obj;
return Objects.equal(this.edsServiceName, that.edsServiceName)
&& Objects.equal(this.lrsServerName, that.lrsServerName)
&& Objects.equal(this.childPolicy, that.childPolicy)
&& Objects.equal(this.fallbackPolicy, that.fallbackPolicy);
}
@Override
public int hashCode() {
return Objects.hashCode(
edsServiceName, lrsServerName, childPolicy, fallbackPolicy);
}
}
}

View File

@ -1,5 +1,4 @@
io.grpc.xds.CdsLoadBalancerProvider
io.grpc.xds.EdsLoadBalancerProvider
io.grpc.xds.WeightedTargetLoadBalancerProvider
io.grpc.xds.XdsLoadBalancerProvider
io.grpc.xds.XdsRoutingLoadBalancerProvider

View File

@ -0,0 +1,48 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.JsonParser;
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link CdsLoadBalancerProvider}. */
@RunWith(JUnit4.class)
public class CdsLoadBalancerProviderTest {
@Test
public void parseCdsLoadBalancingPolicyConfig() throws IOException {
CdsLoadBalancerProvider provider = new CdsLoadBalancerProvider();
String rawCdsLbConfig = "{\n"
+ " \"cluster\": \"cluster-foo.googleapis.com\"\n"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> rawLbConfigMap = (Map<String, ?>) JsonParser.parse(rawCdsLbConfig);
ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap);
assertThat(result.getConfig()).isNotNull();
CdsConfig config = (CdsConfig) result.getConfig();
assertThat(config.name).isEqualTo("cluster-foo.googleapis.com");
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.JsonParser;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link EdsLoadBalancerProvider}. */
@RunWith(JUnit4.class)
public class EdsLoadBalancerProviderTest {
private final EdsLoadBalancerProvider provider = new EdsLoadBalancerProvider();
@Test
public void parseEdsLoadBalancingPolicyConfig() throws IOException {
String rawEdsLbConfig = "{\n"
+ " \"cluster\": \"cluster-foo.googleapis.com\",\n"
+ " \"edsServiceName\": \"cluster-foo:service-blade\",\n"
+ " \"lrsLoadReportingServerName\": \"trafficdirector.googleapis.com\",\n"
+ " \"endpointPickingPolicy\": [\n"
+ " { \"policy_foo\": {} },\n"
+ " { \"pick_first\": {} }\n"
+ " ]\n"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> rawLbConfigMap = (Map<String, ?>) JsonParser.parse(rawEdsLbConfig);
ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap);
assertThat(result.getConfig()).isNotNull();
EdsConfig config = (EdsConfig) result.getConfig();
assertThat(config.clusterName).isEqualTo("cluster-foo.googleapis.com");
assertThat(config.edsServiceName).isEqualTo("cluster-foo:service-blade");
assertThat(config.lrsServerName).isEqualTo("trafficdirector.googleapis.com");
assertThat(config.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("pick_first");
}
@Test
public void parseEdsLoadBalancingPolicyConfig_defaultEndpointPickingPolicy_roundRobin()
throws IOException {
String rawEdsLbConfig = "{\n"
+ " \"cluster\": \"cluster-foo.googleapis.com\",\n"
+ " \"edsServiceName\": \"cluster-foo:service-blade\",\n"
+ " \"lrsLoadReportingServerName\": \"trafficdirector.googleapis.com\"\n"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> rawLbConfigMap = (Map<String, ?>) JsonParser.parse(rawEdsLbConfig);
ConfigOrError result = provider.parseLoadBalancingPolicyConfig(rawLbConfigMap);
assertThat(result.getConfig()).isNotNull();
EdsConfig config = (EdsConfig) result.getConfig();
assertThat(config.endpointPickingPolicy.getProvider().getPolicyName())
.isEqualTo("round_robin");
}
}

View File

@ -74,7 +74,6 @@ import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate;
@ -147,8 +146,6 @@ public class EdsLoadBalancerTest {
@Mock
private Helper helper;
@Mock
private ResourceUpdateCallback resourceUpdateCallback;
@Mock
private Bootstrapper bootstrapper;
@Captor
ArgumentCaptor<ConnectivityState> connectivityStateCaptor;
@ -248,9 +245,8 @@ public class EdsLoadBalancerTest {
fakeClock.getStopwatchSupplier()));
}
edsLb = new EdsLoadBalancer(
helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper,
channelFactory);
edsLb =
new EdsLoadBalancer(helper, lbRegistry, localityStoreFactory, bootstrapper, channelFactory);
}
@After
@ -289,7 +285,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.of(buildDropOverload("throttle", 1000)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
// handleResolutionError() after receiving endpoint update.
edsLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
@ -299,7 +295,7 @@ public class EdsLoadBalancerTest {
}
@Test
public void handleEdsServiceNameChangeInXdsConfig() {
public void handleEdsServiceNameChange() {
assertThat(childHelpers).isEmpty();
deliverResolvedAddresses("edsServiceName1", null, fakeEndpointPickingPolicy);
@ -311,7 +307,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childHelpers).hasSize(1);
Helper childHelper1 = childHelpers.get("subzone1");
LoadBalancer childBalancer1 = childBalancers.get("subzone1");
@ -334,7 +330,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.2", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childHelpers).hasSize(2);
Helper childHelper2 = childHelpers.get("subzone2");
LoadBalancer childBalancer2 = childBalancers.get("subzone2");
@ -361,7 +357,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.3", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childHelpers).hasSize(3);
Helper childHelper3 = childHelpers.get("subzone3");
@ -389,7 +385,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.4", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childHelpers).hasSize(4);
Helper childHelper4 = childHelpers.get("subzone4");
@ -415,7 +411,7 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.5", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childHelpers).hasSize(5);
Helper childHelper5 = childHelpers.get("subzone5");
@ -439,40 +435,6 @@ public class EdsLoadBalancerTest {
verify(childBalancer5, never()).shutdown();
}
@Test
public void firstAndSecondEdsResponseReceived_onWorkingCalledOnce() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
verify(resourceUpdateCallback, never()).onWorking();
// first EDS response
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
verify(resourceUpdateCallback).onWorking();
// second EDS response
clusterLoadAssignment =
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2),
buildLbEndpoint("192.168.0.2", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
verify(resourceUpdateCallback, times(1)).onWorking();
verify(resourceUpdateCallback, never()).onError();
}
@Test
public void handleAllDropUpdates_pickersAreDropped() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
@ -485,9 +447,8 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
verify(resourceUpdateCallback, never()).onAllDrop();
assertThat(childBalancers).hasSize(1);
verify(childBalancers.get("subzone1")).handleResolvedAddresses(
argThat(RoundRobinBackendsMatcher.builder().addHostAndPort("192.168.0.1", 8080).build()));
@ -515,15 +476,12 @@ public class EdsLoadBalancerTest {
buildDropOverload("cat_1", 3),
buildDropOverload("cat_2", 1_000_001),
buildDropOverload("cat_3", 4)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
verify(resourceUpdateCallback).onAllDrop();
verify(helper, atLeastOnce()).updateBalancingState(eq(READY), pickerCaptor.capture());
SubchannelPicker pickerExpectedDropAll = pickerCaptor.getValue();
assertThat(pickerExpectedDropAll.pickSubchannel(mock(PickSubchannelArgs.class)).isDrop())
.isTrue();
verify(resourceUpdateCallback, never()).onError();
}
@Test
@ -557,7 +515,7 @@ public class EdsLoadBalancerTest {
CLUSTER_NAME,
ImmutableList.of(localityLbEndpoints1, localityLbEndpoints2, localityLbEndpoints3),
ImmutableList.<DropOverload>of());
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
assertThat(childBalancers).hasSize(3);
verify(childBalancers.get("subzone1")).handleResolvedAddresses(
@ -586,8 +544,6 @@ public class EdsLoadBalancerTest {
verify(helper, never()).updateBalancingState(eq(READY), any(SubchannelPicker.class));
childHelper2.updateBalancingState(READY, picker);
assertLatestSubchannelPicker(subchannel);
verify(resourceUpdateCallback, never()).onError();
}
// Uses a fake LocalityStoreFactory that creates a mock LocalityStore, and verifies interaction
@ -614,9 +570,8 @@ public class EdsLoadBalancerTest {
return localityStore;
}
};
edsLb = new EdsLoadBalancer(
helper, resourceUpdateCallback, lbRegistry, localityStoreFactory, bootstrapper,
channelFactory);
edsLb =
new EdsLoadBalancer(helper, lbRegistry, localityStoreFactory, bootstrapper, channelFactory);
deliverResolvedAddresses("edsServiceName1", null, fakeEndpointPickingPolicy);
assertThat(localityStores).hasSize(1);
@ -632,7 +587,7 @@ public class EdsLoadBalancerTest {
ImmutableList.of(
buildDropOverload("cat_1", 3),
buildDropOverload("cat_2", 456)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
EndpointUpdate endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment);
verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies());
verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap());
@ -648,7 +603,7 @@ public class EdsLoadBalancerTest {
ImmutableList.of(
buildDropOverload("cat_1", 3),
buildDropOverload("cat_3", 4)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment);
verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies());
@ -670,7 +625,7 @@ public class EdsLoadBalancerTest {
ImmutableList.of(
buildDropOverload("cat_1", 3),
buildDropOverload("cat_3", 4)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
endpointUpdate = getEndpointUpdateFromClusterAssignment(clusterLoadAssignment);
verify(localityStore).updateDropPercentage(endpointUpdate.getDropPolicies());
verify(localityStore).updateLocalityStore(endpointUpdate.getLocalityLbEndpointsMap());
@ -680,10 +635,8 @@ public class EdsLoadBalancerTest {
public void verifyErrorPropagation_noPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
verify(resourceUpdateCallback, never()).onError();
// Forwarding 20 seconds so that the xds client will deem EDS resource not available.
fakeClock.forwardTime(20, TimeUnit.SECONDS);
verify(resourceUpdateCallback).onError();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}
@ -699,17 +652,15 @@ public class EdsLoadBalancerTest {
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.of(buildDropOverload("throttle", 1000)));
receiveEndpointUpdate(clusterLoadAssignment);
deliverClusterLoadAssignments(clusterLoadAssignment);
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(resourceUpdateCallback, never()).onError();
// XdsClient stream receives an error.
responseObserver.onError(new RuntimeException("fake error"));
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
verify(resourceUpdateCallback).onError();
}
/**
@ -752,7 +703,7 @@ public class EdsLoadBalancerTest {
edsLb.handleResolvedAddresses(resolvedAddressBuilder.build());
}
private void receiveEndpointUpdate(ClusterLoadAssignment clusterLoadAssignment) {
private void deliverClusterLoadAssignments(ClusterLoadAssignment clusterLoadAssignment) {
responseObserver.onNext(
buildDiscoveryResponse(
String.valueOf(versionIno++),

View File

@ -1,261 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link FallbackLb}.
*/
@RunWith(JUnit4.class)
// TODO(creamsoup) use parsed service config
public class FallbackLbTest {
private final LoadBalancerProvider fallbackProvider1 = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "fallback_1";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
helpers1.add(helper);
LoadBalancer balancer = mock(LoadBalancer.class);
balancers1.add(balancer);
return balancer;
}
};
private final LoadBalancerProvider fallbackProvider2 = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "fallback_2";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
// just return mock and recored helper and balancer
helpers2.add(helper);
LoadBalancer balancer = mock(LoadBalancer.class);
balancers2.add(balancer);
return balancer;
}
};
private final Helper helper = mock(Helper.class);
private final List<Helper> helpers1 = new ArrayList<>();
private final List<Helper> helpers2 = new ArrayList<>();
private final List<LoadBalancer> balancers1 = new ArrayList<>();
private final List<LoadBalancer> balancers2 = new ArrayList<>();
private final PolicySelection fakeChildPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, new Object());
private LoadBalancer fallbackLb;
@Before
public void setUp() {
LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
lbRegistry.register(fallbackProvider1);
lbRegistry.register(fallbackProvider2);
fallbackLb = new FallbackLb(helper);
assertThat(helpers1).isEmpty();
assertThat(helpers2).isEmpty();
assertThat(balancers1).isEmpty();
assertThat(balancers2).isEmpty();
}
@Test
public void handlePolicyChanges() {
EquivalentAddressGroup eag111 = new EquivalentAddressGroup(mock(SocketAddress.class));
EquivalentAddressGroup eag112 = new EquivalentAddressGroup(mock(SocketAddress.class));
List<EquivalentAddressGroup> eags11 = ImmutableList.of(eag111, eag112);
Object lbConfig11 = new Object();
fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags11)
.setLoadBalancingPolicyConfig(new XdsConfig(
null,
null,
fakeChildPolicy,
new PolicySelection(fallbackProvider1, null, lbConfig11)))
.build());
assertThat(helpers1).hasSize(1);
assertThat(balancers1).hasSize(1);
Helper helper1 = helpers1.get(0);
LoadBalancer balancer1 = balancers1.get(0);
verify(balancer1).handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags11)
.setLoadBalancingPolicyConfig(lbConfig11)
.build());
SubchannelPicker picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1);
verify(helper).updateBalancingState(CONNECTING, picker1);
EquivalentAddressGroup eag121 = new EquivalentAddressGroup(mock(SocketAddress.class));
List<EquivalentAddressGroup> eags12 = ImmutableList.of(eag121);
Object lbConfig12 = new Object();
fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags12)
.setLoadBalancingPolicyConfig(new XdsConfig(
null,
null,
fakeChildPolicy,
new PolicySelection(fallbackProvider1, null, lbConfig12)))
.build());
verify(balancer1).handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags12)
.setLoadBalancingPolicyConfig(lbConfig12)
.build());
verify(balancer1, never()).shutdown();
assertThat(helpers2).isEmpty();
assertThat(balancers2).isEmpty();
// change fallback policy to fallback_2
EquivalentAddressGroup eag211 = new EquivalentAddressGroup(mock(SocketAddress.class));
EquivalentAddressGroup eag212 = new EquivalentAddressGroup(mock(SocketAddress.class));
List<EquivalentAddressGroup> eags21 = ImmutableList.of(eag211, eag212);
Object lbConfig21 = new Object();
fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags21)
.setLoadBalancingPolicyConfig(new XdsConfig(
null,
null,
fakeChildPolicy,
new PolicySelection(fallbackProvider2, null, lbConfig21)))
.build());
verify(balancer1).shutdown();
assertThat(helpers2).hasSize(1);
assertThat(balancers2).hasSize(1);
Helper helper2 = helpers2.get(0);
LoadBalancer balancer2 = balancers2.get(0);
verify(balancer1, never()).handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags21)
.setLoadBalancingPolicyConfig(lbConfig21)
.build());
verify(balancer2).handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags21)
.setLoadBalancingPolicyConfig(lbConfig21)
.build());
picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1);
verify(helper, never()).updateBalancingState(CONNECTING, picker1);
SubchannelPicker picker2 = mock(SubchannelPicker.class);
helper2.updateBalancingState(CONNECTING, picker2);
verify(helper).updateBalancingState(CONNECTING, picker2);
EquivalentAddressGroup eag221 = new EquivalentAddressGroup(mock(SocketAddress.class));
List<EquivalentAddressGroup> eags22 = ImmutableList.of(eag221);
Object lbConfig22 = new Object();
fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags22)
.setLoadBalancingPolicyConfig(new XdsConfig(
null,
null,
fakeChildPolicy,
new PolicySelection(fallbackProvider2, null, lbConfig22)))
.build());
verify(balancer2).handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags22)
.setLoadBalancingPolicyConfig(lbConfig22)
.build());
assertThat(helpers1).hasSize(1);
assertThat(balancers1).hasSize(1);
assertThat(helpers2).hasSize(1);
assertThat(balancers2).hasSize(1);
verify(balancer2, never()).shutdown();
fallbackLb.shutdown();
verify(balancer2).shutdown();
}
@Test
public void propagateAddressesToFallbackPolicy() {
EquivalentAddressGroup eag1 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8080)));
EquivalentAddressGroup eag2 = new EquivalentAddressGroup(
ImmutableList.<SocketAddress>of(new InetSocketAddress(8082)));
List<EquivalentAddressGroup> eags = ImmutableList.of(eag1, eag2);
Object lbConfig = new Object();
fallbackLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setLoadBalancingPolicyConfig(new XdsConfig(
null,
null,
fakeChildPolicy,
new PolicySelection(fallbackProvider1, null, lbConfig)))
.build());
LoadBalancer balancer1 = balancers1.get(0);
verify(balancer1).handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.of(eag1, eag2))
.setLoadBalancingPolicyConfig(lbConfig)
.build());
}
}

View File

@ -1,130 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.internal.JsonParser;
import io.grpc.internal.ServiceConfigUtil;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Tests for {@link XdsLoadBalancerProvider}.
*/
@RunWith(JUnit4.class)
public class XdsLoadBalancerProviderTest {
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Mock
private LoadBalancer fakeBalancer1;
private final LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry();
private final Object lbConfig1 = new Object();
private final LoadBalancerProvider lbProvider1 = new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "supported_1";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return fakeBalancer1;
}
@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return ConfigOrError.fromConfig(lbConfig1);
}
};
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
lbRegistry.register(lbProvider1);
}
@After
public void tearDown() {
lbRegistry.deregister(lbProvider1);
}
@Test
public void parseLoadBalancingConfigPolicy() throws Exception {
String rawLbConfig = "{"
+ "\"childPolicy\" : "
+ " [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"supported_1\" : {}}],"
+ "\"fallbackPolicy\" : [],"
+ "\"edsServiceName\" : \"dns:///eds.service.com:8080\","
+ "\"lrsLoadReportingServerName\" : \"dns:///lrs.service.com:8080\""
+ "}";
Map<String, ?> rawlbConfigMap = checkObject(JsonParser.parse(rawLbConfig));
ConfigOrError configOrError =
XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(rawlbConfigMap, lbRegistry);
assertThat(configOrError.getError()).isNull();
assertThat(configOrError.getConfig()).isInstanceOf(XdsConfig.class);
assertThat(configOrError.getConfig()).isEqualTo(
new XdsConfig(
"dns:///eds.service.com:8080",
"dns:///lrs.service.com:8080",
new PolicySelection(lbProvider1,
ServiceConfigUtil.unwrapLoadBalancingConfig(
checkObject(JsonParser.parse("{\"supported_1\" : {}}"))).getRawConfigValue(),
lbConfig1),
new PolicySelection(
lbRegistry.getProvider("round_robin"),
new HashMap<String, Object>(),
"no service config"))
);
}
@SuppressWarnings("unchecked")
private static Map<String, ?> checkObject(Object o) {
return (Map<String, ?>) o;
}
}

View File

@ -1,328 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.EdsLoadBalancer.ResourceUpdateCallback;
import io.grpc.xds.EdsLoadBalancerProvider.EdsConfig;
import io.grpc.xds.XdsLoadBalancer.PrimaryLbFactory;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/** Unit tests for {@link XdsLoadBalancer}. */
@RunWith(JUnit4.class)
public class XdsLoadBalancerTest {
private static final String AUTHORITY = "grpc-test.googleapis.com";
@Rule
public final ExpectedException thrown = ExpectedException.none();
@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
private final FakeClock fakeClock = new FakeClock();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new AssertionError(e);
}
});
@Mock
private Helper helper;
private LoadBalancer xdsLoadBalancer;
private ResourceUpdateCallback resourceUpdateCallback;
private Helper primaryLbHelper;
private final List<LoadBalancer> primaryLbs = new ArrayList<>();
private Helper fallbackLbHelper;
private final List<LoadBalancer> fallbackLbs = new ArrayList<>();
private int requestConnectionTimes;
private int primaryLbHandleAddrsTimes;
@Before
public void setUp() {
PrimaryLbFactory primaryLbFactory = new PrimaryLbFactory() {
@Override
public LoadBalancer newLoadBalancer(
Helper helper, ResourceUpdateCallback resourceUpdateCallback) {
// just return a mock and record the input and output
primaryLbHelper = helper;
XdsLoadBalancerTest.this.resourceUpdateCallback = resourceUpdateCallback;
LoadBalancer primaryLb = mock(LoadBalancer.class);
primaryLbs.add(primaryLb);
return primaryLb;
}
};
LoadBalancer.Factory fallbackLbFactory = new LoadBalancer.Factory() {
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
// just return a mock and record the input and output
fallbackLbHelper = helper;
LoadBalancer fallbackLb = mock(LoadBalancer.class);
fallbackLbs.add(fallbackLb);
return fallbackLb;
}
};
doReturn(AUTHORITY).when(helper).getAuthority();
doReturn(syncContext).when(helper).getSynchronizationContext();
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
doReturn(mock(ChannelLogger.class)).when(helper).getChannelLogger();
xdsLoadBalancer =
new XdsLoadBalancer(helper, primaryLbFactory, fallbackLbFactory);
xdsLoadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of()).build());
}
@Test
public void tearDown() {
assertThat(primaryLbs).hasSize(1);
xdsLoadBalancer.shutdown();
for (LoadBalancer primaryLb : primaryLbs) {
verify(primaryLb).shutdown();
}
for (LoadBalancer fallbackLb : fallbackLbs) {
verify(fallbackLb).shutdown();
}
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
@Test
public void canHandleEmptyAddressListFromNameResolution() {
assertThat(xdsLoadBalancer.canHandleEmptyAddressListFromNameResolution()).isTrue();
}
@Test
public void timeoutAtStartup_expectUseFallback_thenBackendReady_expectExitFallback() {
verifyNotInFallbackMode();
fakeClock.forwardTime(9, TimeUnit.SECONDS);
resourceUpdateCallback.onWorking();
verifyNotInFallbackMode();
fakeClock.forwardTime(1, TimeUnit.SECONDS);
verifyInFallbackMode();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
primaryLbHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(READY, subchannelPicker);
verifyNotInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void backendReadyBeforeTimeoutAtStartup_expectNoFallback() {
verifyNotInFallbackMode();
fakeClock.forwardTime(9, TimeUnit.SECONDS);
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
resourceUpdateCallback.onWorking();
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
primaryLbHelper.updateBalancingState(READY, subchannelPicker);
verify(helper).updateBalancingState(READY, subchannelPicker);
assertThat(fakeClock.getPendingTasks()).isEmpty();
verifyNotInFallbackMode();
assertThat(fallbackLbs).isEmpty();
}
@Test
public void recevieAllDropBeforeTimeoutAtStartup_expectNoFallback() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
resourceUpdateCallback.onAllDrop();
assertThat(fakeClock.getPendingTasks()).isEmpty();
verifyNotInFallbackMode();
assertThat(fallbackLbs).isEmpty();
}
@Test
public void primaryFailsWithoutSeeingEdsResponseBeforeTimeoutAtStartup() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
resourceUpdateCallback.onError();
verifyInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void primarySeeingEdsResponseThenFailsBeforeTimeoutAtStartup() {
verifyNotInFallbackMode();
assertThat(fakeClock.getPendingTasks()).hasSize(1);
resourceUpdateCallback.onWorking();
resourceUpdateCallback.onError();
verifyNotInFallbackMode();
fakeClock.forwardTime(10, TimeUnit.SECONDS);
verifyInFallbackMode();
assertThat(fallbackLbs).hasSize(1);
}
@Test
public void fallbackWillHandleLastResolvedAddresses() {
verifyNotInFallbackMode();
PolicySelection childPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
PolicySelection fallbackPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(
new XdsConfig(null, null, childPolicy, fallbackPolicy))
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
resourceUpdateCallback.onError();
LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs);
verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses));
}
private void verifyInFallbackMode() {
assertThat(primaryLbs).isNotEmpty();
assertThat(fallbackLbs).isNotEmpty();
LoadBalancer primaryLb = Iterables.getLast(primaryLbs);
LoadBalancer fallbackLb = Iterables.getLast(fallbackLbs);
verify(primaryLb, never()).shutdown();
verify(fallbackLb, never()).shutdown();
xdsLoadBalancer.requestConnection();
verify(primaryLb, times(++requestConnectionTimes)).requestConnection();
verify(fallbackLb).requestConnection();
PolicySelection childPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
PolicySelection fallbackPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(
Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build())
.setLoadBalancingPolicyConfig(new XdsConfig(null, null, childPolicy, fallbackPolicy))
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
verify(fallbackLb).handleResolvedAddresses(same(resolvedAddresses));
ArgumentCaptor<ResolvedAddresses> resolvedAddrCaptor = ArgumentCaptor.forClass(null);
verify(primaryLb, times(++primaryLbHandleAddrsTimes))
.handleResolvedAddresses(resolvedAddrCaptor.capture());
ResolvedAddresses capturedResolvedAddr = resolvedAddrCaptor.getValue();
EdsConfig edsConfig = (EdsConfig) capturedResolvedAddr.getLoadBalancingPolicyConfig();
assertThat(edsConfig.endpointPickingPolicy).isEqualTo(childPolicy);
Status status = Status.DATA_LOSS.withDescription("");
xdsLoadBalancer.handleNameResolutionError(status);
verify(primaryLb).handleNameResolutionError(same(status));
verify(fallbackLb).handleNameResolutionError(same(status));
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
primaryLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper, never()).updateBalancingState(CONNECTING, subchannelPicker);
fallbackLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper).updateBalancingState(CONNECTING, subchannelPicker);
}
private void verifyNotInFallbackMode() {
for (LoadBalancer fallbackLb : fallbackLbs) {
verify(fallbackLb).shutdown();
}
LoadBalancer primaryLb = Iterables.getLast(primaryLbs);
xdsLoadBalancer.requestConnection();
verify(primaryLb, times(++requestConnectionTimes)).requestConnection();
PolicySelection childPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
PolicySelection fallbackPolicy =
new PolicySelection(mock(LoadBalancerProvider.class), null, null);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(
Attributes.newBuilder().set(Attributes.Key.create("k"), new Object()).build())
.setLoadBalancingPolicyConfig(new XdsConfig(null, null, childPolicy, fallbackPolicy))
.build();
xdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
ArgumentCaptor<ResolvedAddresses> resolvedAddrCaptor = ArgumentCaptor.forClass(null);
verify(primaryLb, times(++primaryLbHandleAddrsTimes))
.handleResolvedAddresses(resolvedAddrCaptor.capture());
ResolvedAddresses capturedResolvedAddr = resolvedAddrCaptor.getValue();
EdsConfig edsConfig = (EdsConfig) capturedResolvedAddr.getLoadBalancingPolicyConfig();
assertThat(edsConfig.endpointPickingPolicy).isEqualTo(childPolicy);
Status status = Status.DATA_LOSS.withDescription("");
xdsLoadBalancer.handleNameResolutionError(status);
verify(primaryLb).handleNameResolutionError(same(status));
SubchannelPicker subchannelPicker = mock(SubchannelPicker.class);
primaryLbHelper.updateBalancingState(CONNECTING, subchannelPicker);
verify(helper).updateBalancingState(CONNECTING, subchannelPicker);
}
@Deprecated
@Test
public void handleSubchannelState_shouldThrow() {
Subchannel subchannel = mock(Subchannel.class);
ConnectivityStateInfo connectivityStateInfo = ConnectivityStateInfo.forNonError(READY);
thrown.expect(UnsupportedOperationException.class);
xdsLoadBalancer.handleSubchannelState(subchannel, connectivityStateInfo);
}
}