xds: refactor EDS policy to use ObjectPool<XdsClient> with error handling fixes (#6450)

This commit is contained in:
ZHANG Dapeng 2019-12-04 10:40:20 -08:00 committed by GitHub
parent bfa085a1cf
commit 6b2e754746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 619 additions and 239 deletions

View File

@ -17,17 +17,16 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.XdsNameResolver.XDS_CHANNEL_CREDS_LIST; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.xds.XdsNameResolver.XDS_NODE;
import static java.util.logging.Level.FINEST; import static java.util.logging.Level.FINEST;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.envoyproxy.envoy.api.v2.core.Node; import io.envoyproxy.envoy.api.v2.core.Node;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry; import io.grpc.LoadBalancerRegistry;
@ -38,8 +37,9 @@ import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder; import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ExponentialBackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancer; import io.grpc.internal.ObjectPool;
import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ChannelCreds; import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.Locality;
@ -49,169 +49,223 @@ import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.LocalityStore.LocalityStoreFactory; import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.XdsClient.EndpointUpdate; import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher; import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsClientFactory;
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** Lookaside load balancer that handles balancer name changes. */ /** Lookaside load balancer that handles EDS config. */
final class LookasideLb extends ForwardingLoadBalancer { final class LookasideLb extends LoadBalancer {
private final EdsUpdateCallback edsUpdateCallback; private final ChannelLogger channelLogger;
private final GracefulSwitchLoadBalancer lookasideChannelLb; private final EndpointUpdateCallback endpointUpdateCallback;
private final GracefulSwitchLoadBalancer switchingLoadBalancer;
private final LoadBalancerRegistry lbRegistry; private final LoadBalancerRegistry lbRegistry;
private final LocalityStoreFactory localityStoreFactory; private final LocalityStoreFactory localityStoreFactory;
private final LoadReportClientFactory loadReportClientFactory; private final LoadReportClientFactory loadReportClientFactory;
private final Bootstrapper bootstrapper;
private final Helper lookasideLbHelper;
private String balancerName; // Most recent XdsConfig.
// Becomes non-null once handleResolvedAddresses() successfully.
@Nullable
private XdsConfig xdsConfig;
// Most recent EndpointWatcher.
// Becomes non-null once handleResolvedAddresses() successfully.
@Nullable
private EndpointWatcher endpointWatcher;
LookasideLb(Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { // Becomes non-null and calls getObject() once handleResolvedAddresses() successfully.
// Will call returnObject() at balancer shutdown.
@Nullable
private ObjectPool<XdsClient> xdsClientRef;
// Becomes non-null once handleResolvedAddresses() successfully.
@Nullable
XdsClient xdsClient;
// Becomes non-null for EDS-only case once handleResolvedAddresses() successfully.
// TODO(zdapeng): Stop using it once XdsClientImpl is used.
@Nullable
ManagedChannel channel;
LookasideLb(Helper lookasideLbHelper, EndpointUpdateCallback endpointUpdateCallback) {
this( this(
lookasideLbHelper, checkNotNull(lookasideLbHelper, "lookasideLbHelper"),
edsUpdateCallback, checkNotNull(endpointUpdateCallback, "endpointUpdateCallback"),
LoadBalancerRegistry.getDefaultRegistry(), LoadBalancerRegistry.getDefaultRegistry(),
LocalityStoreFactory.getInstance(), LocalityStoreFactory.getInstance(),
LoadReportClientFactory.getInstance()); LoadReportClientFactory.getInstance(),
Bootstrapper.getInstance());
} }
@VisibleForTesting @VisibleForTesting
LookasideLb( LookasideLb(
Helper lookasideLbHelper, Helper lookasideLbHelper,
EdsUpdateCallback edsUpdateCallback, EndpointUpdateCallback endpointUpdateCallback,
LoadBalancerRegistry lbRegistry, LoadBalancerRegistry lbRegistry,
LocalityStoreFactory localityStoreFactory, LocalityStoreFactory localityStoreFactory,
LoadReportClientFactory loadReportClientFactory) { LoadReportClientFactory loadReportClientFactory,
this.edsUpdateCallback = edsUpdateCallback; Bootstrapper bootstrapper) {
this.lookasideLbHelper = lookasideLbHelper;
this.channelLogger = lookasideLbHelper.getChannelLogger();
this.endpointUpdateCallback = endpointUpdateCallback;
this.lbRegistry = lbRegistry; this.lbRegistry = lbRegistry;
this.lookasideChannelLb = new GracefulSwitchLoadBalancer(lookasideLbHelper); this.switchingLoadBalancer = new GracefulSwitchLoadBalancer(lookasideLbHelper);
this.localityStoreFactory = localityStoreFactory; this.localityStoreFactory = localityStoreFactory;
this.loadReportClientFactory = loadReportClientFactory; this.loadReportClientFactory = loadReportClientFactory;
} this.bootstrapper = bootstrapper;
@Override
protected LoadBalancer delegate() {
return lookasideChannelLb;
} }
@Override @Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
channelLogger.log(ChannelLogLevel.DEBUG, "Received ResolvedAddresses '%s'", resolvedAddresses);
// In the future, xdsConfig can be gotten directly by // In the future, xdsConfig can be gotten directly by
// resolvedAddresses.getLoadBalancingPolicyConfig() // resolvedAddresses.getLoadBalancingPolicyConfig().
Attributes attributes = resolvedAddresses.getAttributes(); Attributes attributes = resolvedAddresses.getAttributes();
Map<String, ?> newRawLbConfig = checkNotNull( Map<String, ?> newRawLbConfig = attributes.get(ATTR_LOAD_BALANCING_CONFIG);
attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available"); if (newRawLbConfig == null) {
// This will not happen when the service config error handling is implemented.
// For now simply go to TRANSIENT_FAILURE.
lookasideLbHelper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription("ATTR_LOAD_BALANCING_CONFIG not available")));
return;
}
ConfigOrError cfg = ConfigOrError cfg =
XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(newRawLbConfig, lbRegistry); XdsLoadBalancerProvider.parseLoadBalancingConfigPolicy(newRawLbConfig, lbRegistry);
if (cfg.getError() != null) { if (cfg.getError() != null) {
throw cfg.getError().asRuntimeException(); // This will not happen when the service config error handling is implemented.
// For now simply go to TRANSIENT_FAILURE.
lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(cfg.getError()));
return;
} }
XdsConfig xdsConfig = (XdsConfig) cfg.getConfig(); XdsConfig newXdsConfig = (XdsConfig) cfg.getConfig();
ObjectPool<XdsClient> xdsClientRefFromResolver = attributes.get(XdsAttributes.XDS_CLIENT_REF);
ObjectPool<XdsClient> xdsClientRef;
final String newBalancerName = xdsConfig.balancerName; // Init XdsClient.
if (xdsClient == null) {
// The is to handle the legacy usecase that requires balancerName from xds config. // There are three usecases:
if (!newBalancerName.equals(balancerName)) { // 1. The EDS-only legacy usecase that requires balancerName from xds config.
balancerName = newBalancerName; // cache the name and check next time for optimization // Note: we don't support balancerName change.
Node nodeFromResolvedAddresses = resolvedAddresses.getAttributes().get(XDS_NODE); // TODO(zdapeng): Remove the legacy case.
final Node node; // 2. The EDS-only with bootstrap usecase:
if (nodeFromResolvedAddresses == null) { // The name resolver resolves a ResolvedAddresses with an XdsConfig without balancerName
node = Node.newBuilder() // field. Use the bootstrap information to create a channel.
.setMetadata(Struct.newBuilder() // 3. Non EDS-only usecase:
.putFields( // XDS_CLIENT_REF attribute is available from ResolvedAddresses either from
"endpoints_required", // XdsNameResolver or CDS policy.
Value.newBuilder().setBoolValue(true).build())) //
.build(); // We assume XdsConfig switching happens only within one usecase, and there is no switching
// between different usecases.
if (newXdsConfig.balancerName != null) {
// This is the EDS-only legacy usecase that requires balancerName from xds config.
channel = initLbChannel(
lookasideLbHelper, newXdsConfig.balancerName, Collections.<ChannelCreds>emptyList());
xdsClientRef = new RefCountedXdsClientObjectPool(new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return new XdsComms2(
channel, lookasideLbHelper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, Node.getDefaultInstance());
}
});
} else if (xdsClientRefFromResolver != null) {
// This is the Non EDS-only usecase.
xdsClientRef = xdsClientRefFromResolver;
} else { } else {
node = nodeFromResolvedAddresses; // This is the EDS-only with bootstrap usecase.
} final BootstrapInfo bootstrapInfo;
List<ChannelCreds> channelCredsListFromResolvedAddresses = try {
resolvedAddresses.getAttributes().get(XDS_CHANNEL_CREDS_LIST); bootstrapInfo = bootstrapper.readBootstrap();
final List<ChannelCreds> channelCredsList; } catch (Exception e) {
if (channelCredsListFromResolvedAddresses == null) { lookasideLbHelper.updateBalancingState(
channelCredsList = Collections.emptyList(); TRANSIENT_FAILURE,
} else { new ErrorPicker(Status.UNAVAILABLE.withCause(e)));
channelCredsList = channelCredsListFromResolvedAddresses; return;
}
channel = initLbChannel(
lookasideLbHelper, bootstrapInfo.getServerUri(),
bootstrapInfo.getChannelCredentials());
xdsClientRef = new RefCountedXdsClientObjectPool(new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
// TODO(zdapeng): Replace XdsComms2 with XdsClientImpl.
return new XdsComms2(
channel, lookasideLbHelper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, bootstrapInfo.getNode());
}
});
} }
LoadBalancerProvider childBalancerProvider = new LoadBalancerProvider() { // At this point the xdsClientRef is assigned in all usecases, cache them for later use.
@Override this.xdsClientRef = xdsClientRef;
public boolean isAvailable() { xdsClient = xdsClientRef.getObject();
return true;
}
@Override
public int getPriority() {
return 5;
}
/**
* A synthetic policy name identified by balancerName. The implementation detail doesn't
* matter.
*/
@Override
public String getPolicyName() {
return "xds_child_policy_balancer_name_" + newBalancerName;
}
@Override
public LoadBalancer newLoadBalancer(final Helper helper) {
return new LoadBalancer() {
@Nullable
XdsClient xdsClient;
@Nullable
LocalityStore localityStore;
@Nullable
LoadReportClient lrsClient;
@Override
public void handleNameResolutionError(Status error) {}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (xdsClient == null) {
ManagedChannel channel = initLbChannel(helper, newBalancerName, channelCredsList);
xdsClient = new XdsComms2(
channel, helper, new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER, node);
LoadStatsStore loadStatsStore = new LoadStatsStoreImpl();
localityStore = localityStoreFactory.newLocalityStore(
helper, lbRegistry, loadStatsStore);
// TODO(zdapeng): Use XdsClient to do Lrs directly.
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(),
loadStatsStore);
final LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {
localityStore.updateOobMetricsReportInterval(reportIntervalNano);
}
};
EndpointWatcher endpointWatcher =
new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore);
xdsClient.watchEndpointData(node.getCluster(), endpointWatcher);
}
}
@Override
public void shutdown() {
if (xdsClient != null) {
lrsClient.stopLoadReporting();
localityStore.reset();
xdsClient.shutdown();
}
}
};
}
};
lookasideChannelLb.switchTo(childBalancerProvider);
} }
lookasideChannelLb.handleResolvedAddresses(resolvedAddresses); // Note: balancerName change is unsupported and ignored.
// TODO(zdapeng): Remove support for balancerName.
// Note: childPolicy change will be handled in LocalityStore, to be implemented.
// If edsServiceName in XdsConfig is changed, do a graceful switch.
if (xdsConfig == null
|| !Objects.equals(newXdsConfig.edsServiceName, xdsConfig.edsServiceName)) {
String edsServiceName = newXdsConfig.edsServiceName;
// The edsServiceName field is null in legacy gRPC client with EDS: use target authority for
// querying endpoints, but in the future we expect this to be explicitly given by EDS config.
// We assume if edsServiceName is null, it will always be null in later resolver updates;
// and if edsServiceName is not null, it will always be not null.
if (edsServiceName == null) {
edsServiceName = lookasideLbHelper.getAuthority();
}
LoadBalancerProvider clusterEndpointsLoadBalancer =
new ClusterEndpointsBalancerProvider(edsServiceName);
switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancer);
}
resolvedAddresses = resolvedAddresses.toBuilder()
.setAttributes(attributes.toBuilder().discard(ATTR_LOAD_BALANCING_CONFIG).build())
.setLoadBalancingPolicyConfig(newXdsConfig)
.build();
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
this.xdsConfig = newXdsConfig;
// TODO(zdapeng): If lrsServerName in XdsConfig is changed, call xdsClient.reportClientStats()
// and/or xdsClient.cancelClientStatsReport().
}
@Override
public void handleNameResolutionError(Status error) {
channelLogger.log(ChannelLogLevel.ERROR, "Name resolution error: '%s'", error);
// Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise,
// we keep running with the data we had previously.
if (endpointWatcher == null) {
lookasideLbHelper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
} else {
switchingLoadBalancer.handleNameResolutionError(error);
}
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
@Override
public void shutdown() {
channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down");
switchingLoadBalancer.shutdown();
if (xdsClientRef != null) {
xdsClientRef.returnObject(xdsClient);
}
} }
private static ManagedChannel initLbChannel( private static ManagedChannel initLbChannel(
@ -222,7 +276,7 @@ final class LookasideLb extends ForwardingLoadBalancer {
try { try {
channel = helper.createResolvingOobChannel(balancerName); channel = helper.createResolvingOobChannel(balancerName);
} catch (UnsupportedOperationException uoe) { } catch (UnsupportedOperationException uoe) {
// Temporary solution until createResolvingOobChannel is implemented // Temporary solution until createResolvingOobChannel is implemented.
// FIXME (https://github.com/grpc/grpc-java/issues/5495) // FIXME (https://github.com/grpc/grpc-java/issues/5495)
Logger logger = Logger.getLogger(LookasideLb.class.getName()); Logger logger = Logger.getLogger(LookasideLb.class.getName());
if (logger.isLoggable(FINEST)) { if (logger.isLoggable(FINEST)) {
@ -248,10 +302,134 @@ final class LookasideLb extends ForwardingLoadBalancer {
return channel; return channel;
} }
private final class ClusterEndpointsBalancerProvider extends LoadBalancerProvider {
final String edsServiceName;
@Nullable
final String oldEdsServiceName;
@Nullable
final EndpointWatcher oldEndpointWatcher;
ClusterEndpointsBalancerProvider(String edsServiceName) {
this.edsServiceName = edsServiceName;
if (xdsConfig != null) {
oldEdsServiceName = xdsConfig.edsServiceName;
} else {
oldEdsServiceName = null;
}
oldEndpointWatcher = endpointWatcher;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
// A synthetic policy name identified by edsServiceName in XdsConfig.
@Override
public String getPolicyName() {
return "xds_policy__edsServiceName_" + edsServiceName;
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return new ClusterEndpointsBalancer(helper);
}
/**
* Load-balances endpoints for a given cluster.
*/
final class ClusterEndpointsBalancer extends LoadBalancer {
final Helper helper;
// All fields become non-null once handleResolvedAddresses() successfully.
// All fields are assigned at most once.
@Nullable
LocalityStore localityStore;
@Nullable
LoadReportClient lrsClient;
@Nullable
EndpointWatcherImpl endpointWatcher;
ClusterEndpointsBalancer(Helper helper) {
this.helper = helper;
}
@Override
public void handleNameResolutionError(Status error) {
// Go into TRANSIENT_FAILURE if we have not yet received any endpoint update. Otherwise,
// we keep running with the data we had previously.
if (endpointWatcher == null || !endpointWatcher.firstEndpointUpdateReceived) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
@Override
public boolean canHandleEmptyAddressListFromNameResolution() {
return true;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
XdsConfig xdsConfig = (XdsConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (endpointWatcher != null) {
// TODO(zddapeng): Handle child policy changed if any.
return;
}
LoadStatsStore loadStatsStore = new LoadStatsStoreImpl();
localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore);
LoadReportCallback lrsCallback =
new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {
localityStore.updateOobMetricsReportInterval(reportIntervalNano);
}
};
// TODO(zdapeng): Use XdsClient to do Lrs directly.
// For now create an LRS Client.
if (xdsConfig.balancerName != null) {
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(), loadStatsStore);
} else {
lrsClient = new LoadReportClient() {
@Override
public void startLoadReporting(LoadReportCallback callback) {}
@Override
public void stopLoadReporting() {}
};
}
endpointWatcher = new EndpointWatcherImpl(lrsClient, lrsCallback, localityStore);
xdsClient.watchEndpointData(edsServiceName, endpointWatcher);
if (oldEndpointWatcher != null && oldEdsServiceName != null) {
xdsClient.cancelEndpointDataWatch(oldEdsServiceName, oldEndpointWatcher);
}
LookasideLb.this.endpointWatcher = endpointWatcher;
}
@Override
public void shutdown() {
if (endpointWatcher != null) {
lrsClient.stopLoadReporting();
localityStore.reset();
xdsClient.cancelEndpointDataWatch(edsServiceName, endpointWatcher);
}
}
}
}
/** /**
* Callbacks for the EDS-only-with-fallback usecase. Being deprecated. * Callbacks for the EDS-only-with-fallback usecase. Being deprecated.
*/ */
interface EdsUpdateCallback { interface EndpointUpdateCallback {
void onWorking(); void onWorking();
@ -265,7 +443,7 @@ final class LookasideLb extends ForwardingLoadBalancer {
final LoadReportClient lrsClient; final LoadReportClient lrsClient;
final LoadReportCallback lrsCallback; final LoadReportCallback lrsCallback;
final LocalityStore localityStore; final LocalityStore localityStore;
boolean firstEdsUpdateReceived; boolean firstEndpointUpdateReceived;
EndpointWatcherImpl( EndpointWatcherImpl(
LoadReportClient lrsClient, LoadReportCallback lrsCallback, LocalityStore localityStore) { LoadReportClient lrsClient, LoadReportCallback lrsCallback, LocalityStore localityStore) {
@ -276,9 +454,14 @@ final class LookasideLb extends ForwardingLoadBalancer {
@Override @Override
public void onEndpointChanged(EndpointUpdate endpointUpdate) { public void onEndpointChanged(EndpointUpdate endpointUpdate) {
if (!firstEdsUpdateReceived) { channelLogger.log(
firstEdsUpdateReceived = true; ChannelLogLevel.DEBUG,
edsUpdateCallback.onWorking(); "EDS load balancer received an endpoint update: '%s'",
endpointUpdate);
if (!firstEndpointUpdateReceived) {
firstEndpointUpdateReceived = true;
endpointUpdateCallback.onWorking();
lrsClient.startLoadReporting(lrsCallback); lrsClient.startLoadReporting(lrsCallback);
} }
@ -287,7 +470,7 @@ final class LookasideLb extends ForwardingLoadBalancer {
for (DropOverload dropOverload : dropOverloads) { for (DropOverload dropOverload : dropOverloads) {
dropOverloadsBuilder.add(dropOverload); dropOverloadsBuilder.add(dropOverload);
if (dropOverload.getDropsPerMillion() == 1_000_000) { if (dropOverload.getDropsPerMillion() == 1_000_000) {
edsUpdateCallback.onAllDrop(); endpointUpdateCallback.onAllDrop();
break; break;
} }
} }
@ -309,7 +492,9 @@ final class LookasideLb extends ForwardingLoadBalancer {
@Override @Override
public void onError(Status error) { public void onError(Status error) {
edsUpdateCallback.onError(); channelLogger.log(
ChannelLogLevel.ERROR, "EDS load balancer received an error: '%s'", error);
endpointUpdateCallback.onError();
} }
} }
} }

View File

@ -27,7 +27,7 @@ import io.grpc.LoadBalancer;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LookasideLb.EdsUpdateCallback; import io.grpc.xds.LookasideLb.EndpointUpdateCallback;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.annotation.CheckForNull; import javax.annotation.CheckForNull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -46,7 +46,7 @@ final class XdsLoadBalancer2 extends LoadBalancer {
private final Helper helper; private final Helper helper;
private final LoadBalancer lookasideLb; private final LoadBalancer lookasideLb;
private final LoadBalancer.Factory fallbackLbFactory; private final LoadBalancer.Factory fallbackLbFactory;
private final EdsUpdateCallback edsUpdateCallback = new EdsUpdateCallback() { private final EndpointUpdateCallback edsUpdateCallback = new EndpointUpdateCallback() {
@Override @Override
public void onWorking() { public void onWorking() {
if (childPolicyHasBeenReady) { if (childPolicyHasBeenReady) {
@ -247,13 +247,13 @@ final class XdsLoadBalancer2 extends LoadBalancer {
/** Factory of a look-aside load balancer. The interface itself is for convenience in test. */ /** Factory of a look-aside load balancer. The interface itself is for convenience in test. */
@VisibleForTesting @VisibleForTesting
interface LookasideLbFactory { interface LookasideLbFactory {
LoadBalancer newLoadBalancer(Helper helper, EdsUpdateCallback edsUpdateCallback); LoadBalancer newLoadBalancer(Helper helper, EndpointUpdateCallback edsUpdateCallback);
} }
private static final class LookasideLbFactoryImpl implements LookasideLbFactory { private static final class LookasideLbFactoryImpl implements LookasideLbFactory {
@Override @Override
public LoadBalancer newLoadBalancer( public LoadBalancer newLoadBalancer(
Helper lookasideLbHelper, EdsUpdateCallback edsUpdateCallback) { Helper lookasideLbHelper, EndpointUpdateCallback edsUpdateCallback) {
return new LookasideLb(lookasideLbHelper, edsUpdateCallback); return new LookasideLb(lookasideLbHelper, edsUpdateCallback);
} }
} }

View File

@ -19,6 +19,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG; import static io.grpc.LoadBalancer.ATTR_LOAD_BALANCING_CONFIG;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
@ -42,6 +43,7 @@ import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest; import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse; import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.envoyproxy.envoy.api.v2.core.Address; import io.envoyproxy.envoy.api.v2.core.Address;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.core.SocketAddress; import io.envoyproxy.envoy.api.v2.core.SocketAddress;
import io.envoyproxy.envoy.api.v2.endpoint.Endpoint; import io.envoyproxy.envoy.api.v2.endpoint.Endpoint;
import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint; import io.envoyproxy.envoy.api.v2.endpoint.LbEndpoint;
@ -56,6 +58,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry; import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
@ -65,17 +68,25 @@ import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy.Provider; import io.grpc.internal.BackoffPolicy.Provider;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser; import io.grpc.internal.JsonParser;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback; import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.LocalityStore.LocalityStoreFactory; import io.grpc.xds.LocalityStore.LocalityStoreFactory;
import io.grpc.xds.LookasideLb.EdsUpdateCallback; import io.grpc.xds.LookasideLb.EndpointUpdateCallback;
import java.util.ArrayList; import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
import io.grpc.xds.XdsClient.XdsClientFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Before; import org.junit.Before;
@ -118,15 +129,17 @@ public class LookasideLbTest {
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance())) .addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build(); .build();
private final List<Helper> helpers = new ArrayList<>(); private final Deque<Helper> helpers = new ArrayDeque<>();
private final List<LocalityStore> localityStores = new ArrayList<>(); private final Deque<LocalityStore> localityStores = new ArrayDeque<>();
private final List<LoadReportClient> loadReportClients = new ArrayList<>(); private final Deque<LoadReportClient> loadReportClients = new ArrayDeque<>();
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();
@Mock @Mock
private Helper helper; private Helper helper;
@Mock @Mock
private EdsUpdateCallback edsUpdateCallback; private EndpointUpdateCallback edsUpdateCallback;
@Mock
private Bootstrapper bootstrapper;
@Captor @Captor
private ArgumentCaptor<ImmutableMap<Locality, LocalityLbEndpoints>> private ArgumentCaptor<ImmutableMap<Locality, LocalityLbEndpoints>>
localityEndpointsMappingCaptor; localityEndpointsMappingCaptor;
@ -134,7 +147,6 @@ public class LookasideLbTest {
private ManagedChannel channel; private ManagedChannel channel;
private ManagedChannel channel2; private ManagedChannel channel2;
private StreamObserver<DiscoveryResponse> serverResponseWriter; private StreamObserver<DiscoveryResponse> serverResponseWriter;
private LocalityStoreFactory localityStoreFactory;
private LoadBalancer lookasideLb; private LoadBalancer lookasideLb;
private ResolvedAddresses defaultResolvedAddress; private ResolvedAddresses defaultResolvedAddress;
@ -192,7 +204,7 @@ public class LookasideLbTest {
doReturn(channel, channel2).when(helper).createResolvingOobChannel(anyString()); doReturn(channel, channel2).when(helper).createResolvingOobChannel(anyString());
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService(); doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
localityStoreFactory = new LocalityStoreFactory() { LocalityStoreFactory localityStoreFactory = new LocalityStoreFactory() {
@Override @Override
public LocalityStore newLocalityStore( public LocalityStore newLocalityStore(
Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) { Helper helper, LoadBalancerRegistry lbRegistry, LoadStatsStore loadStatsStore) {
@ -213,9 +225,32 @@ public class LookasideLbTest {
} }
}; };
LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();
lbRegistry.register(new LoadBalancerProvider() {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 5;
}
@Override
public String getPolicyName() {
return "supported1";
}
@Override
public LoadBalancer newLoadBalancer(Helper helper) {
return mock(LoadBalancer.class);
}
});
lookasideLb = new LookasideLb( lookasideLb = new LookasideLb(
helper, edsUpdateCallback, new LoadBalancerRegistry(), localityStoreFactory, helper, edsUpdateCallback, lbRegistry, localityStoreFactory, loadReportClientFactory,
loadReportClientFactory); bootstrapper);
String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}";
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -227,144 +262,222 @@ public class LookasideLbTest {
} }
@Test @Test
public void handleChildPolicyChangeThenBalancerNameChangeThenChildPolicyChange() public void canHandleEmptyAddressListFromNameResolution() {
assertThat(lookasideLb.canHandleEmptyAddressListFromNameResolution()).isTrue();
}
@Test
public void handleNameResolutionErrorBeforeAndAfterEdsWorkding() throws Exception {
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return mock(XdsClient.class);
}
};
ObjectPool<XdsClient> xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory);
XdsClient xdsClientFromResolver = xdsClientRef.getObject();
String lbConfigRaw =
"{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}"
.replace("'", "\"");
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(1);
assertThat(localityStores).hasSize(1);
ArgumentCaptor<EndpointWatcher> endpointWatcherCaptor =
ArgumentCaptor.forClass(EndpointWatcher.class);
verify(xdsClientFromResolver).watchEndpointData(
eq("edsServiceName1"), endpointWatcherCaptor.capture());
EndpointWatcher endpointWatcher = endpointWatcherCaptor.getValue();
// handleResolutionError() before receiving any endpoint update.
lookasideLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
// Endpoint update received.
endpointWatcher.onEndpointChanged(
EndpointUpdate.newBuilder().setClusterName("edsServiceName1").build());
// handleResolutionError() after receiving endpoint update.
lookasideLb.handleNameResolutionError(Status.DATA_LOSS.withDescription("fake status"));
// No more TRANSIENT_FAILURE.
verify(helper, times(1)).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}
@SuppressWarnings("unchecked")
@Test
public void handleEdsServiceNameChangeInXdsConfig_swtichGracefully()
throws Exception { throws Exception {
assertThat(helpers).isEmpty(); assertThat(helpers).isEmpty();
assertThat(localityStores).isEmpty(); assertThat(localityStores).isEmpty();
assertThat(loadReportClients).isEmpty(); assertThat(loadReportClients).isEmpty();
List<EquivalentAddressGroup> eags = ImmutableList.of(); List<EquivalentAddressGroup> eags = ImmutableList.of();
String lbConfigRaw11 = "{\"balancerName\" : \"dns:///balancer1.example.com:8080\"}"; XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return mock(XdsClient.class);
}
};
ObjectPool<XdsClient> xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory);
XdsClient xdsClientFromResolver = xdsClientRef.getObject();
String lbConfigRaw =
"{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}"
.replace("'", "\"");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, ?> lbConfig11 = (Map<String, ?>) JsonParser.parse(lbConfigRaw11); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags) .setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build()) .setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build(); .build();
lookasideLb.handleResolvedAddresses(resolvedAddresses); lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(1); assertThat(helpers).hasSize(1);
assertThat(localityStores).hasSize(1); assertThat(localityStores).hasSize(1);
assertThat(loadReportClients).hasSize(1); Helper helper1 = helpers.peekLast();
Helper helper1 = helpers.get(0); LocalityStore localityStore1 = localityStores.peekLast();
SubchannelPicker picker1 = mock(SubchannelPicker.class); SubchannelPicker picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1); helper1.updateBalancingState(CONNECTING, picker1);
verify(helper).updateBalancingState(CONNECTING, picker1); verify(helper).updateBalancingState(CONNECTING, picker1);
String lbConfigRaw12 = "{" // Change edsServicename to edsServiceName2.
+ "\"balancerName\" : \"dns:///balancer1.example.com:8080\"," lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName2'}"
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]" .replace("'", "\"");
+ "}"; lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig12 = (Map<String, ?>) JsonParser.parse(lbConfigRaw12);
resolvedAddresses = ResolvedAddresses.newBuilder() resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags) .setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig12).build()) .setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build(); .build();
lookasideLb.handleResolvedAddresses(resolvedAddresses); lookasideLb.handleResolvedAddresses(resolvedAddresses);
LocalityStore localityStore1 = Iterables.getOnlyElement(localityStores);
LoadReportClient loadReportClient1 = Iterables.getOnlyElement(loadReportClients);
verify(localityStore1, never()).reset();
verify(loadReportClient1, never()).stopLoadReporting();
assertThat(helpers).hasSize(1);
assertThat(localityStores).hasSize(1);
assertThat(loadReportClients).hasSize(1);
// change balancer name policy to balancer2.example.com
String lbConfigRaw21 = "{\"balancerName\" : \"dns:///balancer2.example.com:8080\"}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig21 = (Map<String, ?>) JsonParser.parse(lbConfigRaw21);
resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig21).build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
verify(localityStore1).reset();
verify(loadReportClient1).stopLoadReporting();
assertThat(helpers).hasSize(2); assertThat(helpers).hasSize(2);
assertThat(localityStores).hasSize(2); assertThat(localityStores).hasSize(2);
assertThat(loadReportClients).hasSize(2); Helper helper2 = helpers.peekLast();
Helper helper2 = helpers.get(1); LocalityStore localityStore2 = localityStores.peekLast();
LocalityStore localityStore2 = localityStores.get(1);
LoadReportClient loadReportClient2 = loadReportClients.get(1);
picker1 = mock(SubchannelPicker.class);
helper1.updateBalancingState(CONNECTING, picker1);
verify(helper, never()).updateBalancingState(CONNECTING, picker1);
SubchannelPicker picker2 = mock(SubchannelPicker.class); SubchannelPicker picker2 = mock(SubchannelPicker.class);
helper2.updateBalancingState(CONNECTING, picker2); helper2.updateBalancingState(CONNECTING, picker2);
// balancer1 was not READY, so balancer2 will update picker immediately
verify(helper).updateBalancingState(CONNECTING, picker2); verify(helper).updateBalancingState(CONNECTING, picker2);
verify(localityStore1).reset();
helper2.updateBalancingState(READY, picker2);
verify(helper).updateBalancingState(READY, picker2);
String lbConfigRaw22 = "{" // Change edsServiceName to edsServiceName3.
+ "\"balancerName\" : \"dns:///balancer2.example.com:8080\"," lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName3'}"
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_2\" : {}}]" .replace("'", "\"");
+ "}"; lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig22 = (Map<String, ?>) JsonParser.parse(lbConfigRaw22);
resolvedAddresses = ResolvedAddresses.newBuilder() resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags) .setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig22).build()) .setAttributes(Attributes.newBuilder()
.build(); .set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
lookasideLb.handleResolvedAddresses(resolvedAddresses); .set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
assertThat(helpers).hasSize(2);
assertThat(localityStores).hasSize(2);
SubchannelPicker picker3 = mock(SubchannelPicker.class);
helper2.updateBalancingState(READY, picker3);
verify(helper).updateBalancingState(READY, picker3);
String lbConfigRaw3 = "{"
+ "\"balancerName\" : \"dns:///balancer3.example.com:8080\","
+ "\"childPolicy\" : [{\"unsupported\" : {\"key\" : \"val\"}}, {\"unsupported_3\" : {}}]"
+ "}";
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig3 = (Map<String, ?>) JsonParser.parse(lbConfigRaw3);
resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig3).build())
.build(); .build();
lookasideLb.handleResolvedAddresses(resolvedAddresses); lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(3); assertThat(helpers).hasSize(3);
assertThat(localityStores).hasSize(3);
Helper helper3 = helpers.peekLast();
LocalityStore localityStore3 = localityStores.peekLast();
Helper helper3 = helpers.get(2); SubchannelPicker picker3 = mock(SubchannelPicker.class);
SubchannelPicker picker4 = mock(SubchannelPicker.class); helper3.updateBalancingState(CONNECTING, picker3);
helper3.updateBalancingState(CONNECTING, picker4); verify(helper, never()).updateBalancingState(CONNECTING, picker3);
// balancer2 was READY, so balancer3 will gracefully switch and not update non-READY picker
verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker4));
verify(localityStore2, never()).reset(); verify(localityStore2, never()).reset();
verify(loadReportClient2, never()).stopLoadReporting(); picker2 = mock(SubchannelPicker.class);
helper2.updateBalancingState(CONNECTING, picker2);
SubchannelPicker picker5 = mock(SubchannelPicker.class); // The old balancer becomes not READY, so the new balancer will update picker immediately.
helper3.updateBalancingState(READY, picker5); verify(helper).updateBalancingState(CONNECTING, picker3);
verify(helper).updateBalancingState(READY, picker5);
verify(localityStore2).reset(); verify(localityStore2).reset();
verify(loadReportClient2).stopLoadReporting();
verify(localityStores.get(2), never()).reset(); // Change edsServiceName to edsServiceName4.
verify(loadReportClients.get(2), never()).stopLoadReporting(); lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName4'}"
.replace("'", "\"");
lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(4);
assertThat(localityStores).hasSize(4);
Helper helper4 = helpers.peekLast();
LocalityStore localityStore4 = localityStores.peekLast();
verify(localityStore3).reset();
SubchannelPicker picker4 = mock(SubchannelPicker.class);
helper4.updateBalancingState(READY, picker4);
verify(helper).updateBalancingState(READY, picker4);
// Change edsServiceName to edsServiceName5.
lbConfigRaw = "{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName5'}"
.replace("'", "\"");
lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(eags)
.setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(5);
assertThat(localityStores).hasSize(5);
Helper helper5 = helpers.peekLast();
LocalityStore localityStore5 = localityStores.peekLast();
SubchannelPicker picker5 = mock(SubchannelPicker.class);
helper5.updateBalancingState(CONNECTING, picker5);
// The old balancer was READY, so the new balancer will gracefully switch and not update
// non-READY picker.
verify(helper, never()).updateBalancingState(any(ConnectivityState.class), eq(picker5));
verify(localityStore4, never()).reset();
helper5.updateBalancingState(READY, picker5);
verify(helper).updateBalancingState(READY, picker5);
verify(localityStore4).reset();
verify(localityStore5, never()).reset();
lookasideLb.shutdown(); lookasideLb.shutdown();
verify(localityStores.get(2)).reset(); verify(localityStore5).reset();
verify(loadReportClients.get(2)).stopLoadReporting();
xdsClientRef.returnObject(xdsClientFromResolver);
} }
@Deprecated // balancerName will be unsupported.
@Test @Test
public void handleResolvedAddress_createLbChannel() public void handleResolvedAddress_createLbChannel()
throws Exception { throws Exception {
// Test balancer created with the default real LookasideChannelLbFactory // Test balancer created with the default real LookasideChannelLbFactory
lookasideLb = new LookasideLb(helper, mock(EdsUpdateCallback.class)); lookasideLb = new LookasideLb(helper, mock(EndpointUpdateCallback.class));
String lbConfigRaw11 = "{'balancerName' : 'dns:///balancer1.example.com:8080'}" String lbConfigRaw = "{'balancerName' : 'dns:///balancer1.example.com:8080'}"
.replace("'", "\""); .replace("'", "\"");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, ?> lbConfig11 = (Map<String, ?>) JsonParser.parse(lbConfigRaw11); Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder() ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of()) .setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig11).build()) .setAttributes(Attributes.newBuilder().set(ATTR_LOAD_BALANCING_CONFIG, lbConfig).build())
.build(); .build();
verify(helper, never()).createResolvingOobChannel(anyString()); verify(helper, never()).createResolvingOobChannel(anyString());
@ -374,6 +487,88 @@ public class LookasideLbTest {
lookasideLb.shutdown(); lookasideLb.shutdown();
} }
@Test
public void handleResolvedAddress_withBootstrap() throws Exception {
BootstrapInfo bootstrapInfo = new BootstrapInfo(
"trafficdirector.googleapis.com", ImmutableList.<ChannelCreds>of(),
Node.getDefaultInstance());
doReturn(bootstrapInfo).when(bootstrapper).readBootstrap();
String lbConfigRaw =
"{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}"
.replace("'", "\"");
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.build())
.build();
verify(helper, never()).createResolvingOobChannel(anyString());
lookasideLb.handleResolvedAddresses(resolvedAddresses);
verify(helper).createResolvingOobChannel("trafficdirector.googleapis.com");
assertThat(helpers).hasSize(1);
assertThat(localityStores).hasSize(1);
Helper helper1 = helpers.peekLast();
LocalityStore localityStore1 = localityStores.peekLast();
SubchannelPicker picker = mock(SubchannelPicker.class);
helper1.updateBalancingState(READY, picker);
verify(helper).updateBalancingState(READY, picker);
lookasideLb.shutdown();
verify(localityStore1).reset();
}
@Test
public void handleResolvedAddress_withXdsClientRefAttributes() throws Exception {
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return mock(XdsClient.class);
}
};
ObjectPool<XdsClient> xdsClientRef = new RefCountedXdsClientObjectPool(xdsClientFactory);
XdsClient xdsClientFromResolver = xdsClientRef.getObject();
String lbConfigRaw =
"{'childPolicy' : [{'supported1' : {}}], 'edsServiceName' : 'edsServiceName1'}"
.replace("'", "\"");
@SuppressWarnings("unchecked")
Map<String, ?> lbConfig = (Map<String, ?>) JsonParser.parse(lbConfigRaw);
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, lbConfig)
.set(XdsAttributes.XDS_CLIENT_REF, xdsClientRef)
.build())
.build();
lookasideLb.handleResolvedAddresses(resolvedAddresses);
assertThat(helpers).hasSize(1);
assertThat(localityStores).hasSize(1);
ArgumentCaptor<EndpointWatcher> endpointWatcherCaptor =
ArgumentCaptor.forClass(EndpointWatcher.class);
verify(xdsClientFromResolver).watchEndpointData(
eq("edsServiceName1"), endpointWatcherCaptor.capture());
EndpointWatcher endpointWatcher = endpointWatcherCaptor.getValue();
Helper helper1 = helpers.peekLast();
SubchannelPicker picker = mock(SubchannelPicker.class);
helper1.updateBalancingState(READY, picker);
verify(helper).updateBalancingState(READY, picker);
// Mimic resolver shutdown
xdsClientRef.returnObject(xdsClientFromResolver);
verify(xdsClientFromResolver, never()).shutdown();
lookasideLb.shutdown();
verify(xdsClientFromResolver).cancelEndpointDataWatch("edsServiceName1", endpointWatcher);
verify(xdsClientFromResolver).shutdown();
}
@Test @Test
public void firstAndSecondEdsResponseReceived() { public void firstAndSecondEdsResponseReceived() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress); lookasideLb.handleResolvedAddresses(defaultResolvedAddress);

View File

@ -40,7 +40,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.xds.LookasideLb.EdsUpdateCallback; import io.grpc.xds.LookasideLb.EndpointUpdateCallback;
import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory; import io.grpc.xds.XdsLoadBalancer2.LookasideLbFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -75,7 +75,7 @@ public class XdsLoadBalancer2Test {
@Mock @Mock
private Helper helper; private Helper helper;
private LoadBalancer xdsLoadBalancer; private LoadBalancer xdsLoadBalancer;
private EdsUpdateCallback edsUpdateCallback; private EndpointUpdateCallback edsUpdateCallback;
private Helper lookasideLbHelper; private Helper lookasideLbHelper;
private final List<LoadBalancer> lookasideLbs = new ArrayList<>(); private final List<LoadBalancer> lookasideLbs = new ArrayList<>();
@ -90,7 +90,7 @@ public class XdsLoadBalancer2Test {
LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() { LookasideLbFactory lookasideLbFactory = new LookasideLbFactory() {
@Override @Override
public LoadBalancer newLoadBalancer( public LoadBalancer newLoadBalancer(
Helper helper, EdsUpdateCallback edsUpdateCallback) { Helper helper, EndpointUpdateCallback edsUpdateCallback) {
// just return a mock and record the input and output // just return a mock and record the input and output
lookasideLbHelper = helper; lookasideLbHelper = helper;
XdsLoadBalancer2Test.this.edsUpdateCallback = edsUpdateCallback; XdsLoadBalancer2Test.this.edsUpdateCallback = edsUpdateCallback;