xds: run watcher callbacks in its own channel synchronization context (#7525)

In the context of sharing the XdsClient instance between Channels, watcher callbacks need to be executed in each Channel's own SynchronizationContext.
This commit is contained in:
Chengyuan Zhang 2020-10-21 13:06:08 -07:00 committed by GitHub
parent 0b6e6e5fd5
commit 19485014fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 273 additions and 163 deletions

View File

@ -29,6 +29,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
@ -52,6 +53,7 @@ import javax.annotation.Nullable;
final class CdsLoadBalancer extends LoadBalancer {
private final XdsLogger logger;
private final Helper helper;
private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry;
private final TlsContextManager tlsContextManager;
// TODO(sanjaypujare): remove once xds security is released
@ -71,6 +73,7 @@ final class CdsLoadBalancer extends LoadBalancer {
CdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
TlsContextManager tlsContextManager) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.lbRegistry = lbRegistry;
this.tlsContextManager = tlsContextManager;
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
@ -179,6 +182,7 @@ final class CdsLoadBalancer extends LoadBalancer {
private final class CdsLbState implements CdsResourceWatcher {
private final ChannelSecurityLbHelper lbHelper = new ChannelSecurityLbHelper();
private boolean shutdown;
@Nullable
LoadBalancer edsBalancer;
@ -189,37 +193,43 @@ final class CdsLoadBalancer extends LoadBalancer {
}
@Override
public void onChanged(CdsUpdate newUpdate) {
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
"Received cluster update from xDS client {0}: "
+ "cluster_name={1}, eds_service_name={2}, lb_policy={3}, report_load={4}",
xdsClient, newUpdate.getClusterName(), newUpdate.getEdsServiceName(),
newUpdate.getLbPolicy(), newUpdate.getLrsServerName() != null);
}
// FIXME(chengyuanzhang): handle error correctly to avoid being unnecessarily fragile.
checkArgument(
newUpdate.getLbPolicy().equals("round_robin"), "can only support round_robin policy");
LoadBalancerProvider endpointPickingPolicyProvider =
lbRegistry.getProvider(newUpdate.getLbPolicy());
LoadBalancerProvider localityPickingPolicyProvider =
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); // hardcode to weighted-target
final EdsConfig edsConfig =
new EdsConfig(
/* clusterName = */ newUpdate.getClusterName(),
/* edsServiceName = */ newUpdate.getEdsServiceName(),
/* lrsServerName = */ newUpdate.getLrsServerName(),
new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */),
new PolicySelection(endpointPickingPolicyProvider, null));
if (isXdsSecurityEnabled()) {
updateSslContextProviderSupplier(newUpdate.getUpstreamTlsContext());
}
if (edsBalancer == null) {
edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(lbHelper);
}
edsBalancer.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build());
public void onChanged(final CdsUpdate newUpdate) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(XdsLogLevel.INFO, "Received cluster update from xDS client {0}: "
+ "cluster_name={1}, eds_service_name={2}, lb_policy={3}, report_load={4}",
xdsClient, newUpdate.getClusterName(), newUpdate.getEdsServiceName(),
newUpdate.getLbPolicy(), newUpdate.getLrsServerName() != null);
}
// FIXME(chengyuanzhang): handle error correctly to avoid being unnecessarily fragile.
checkArgument(newUpdate.getLbPolicy().equals("round_robin"),
"can only support round_robin policy");
LoadBalancerProvider endpointPickingPolicyProvider =
lbRegistry.getProvider(newUpdate.getLbPolicy());
LoadBalancerProvider localityPickingPolicyProvider =
lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); // hardcode to weighted-target
final EdsConfig edsConfig =
new EdsConfig(
/* clusterName = */ newUpdate.getClusterName(),
/* edsServiceName = */ newUpdate.getEdsServiceName(),
/* lrsServerName = */ newUpdate.getLrsServerName(),
new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */),
new PolicySelection(endpointPickingPolicyProvider, null));
if (isXdsSecurityEnabled()) {
updateSslContextProviderSupplier(newUpdate.getUpstreamTlsContext());
}
if (edsBalancer == null) {
edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(lbHelper);
}
edsBalancer.handleResolvedAddresses(
resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build());
}
});
}
/** For new UpstreamTlsContext value, release old SslContextProvider. */
@ -244,32 +254,49 @@ final class CdsLoadBalancer extends LoadBalancer {
}
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
if (edsBalancer != null) {
edsBalancer.shutdown();
edsBalancer = null;
}
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable")));
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
if (edsBalancer != null) {
edsBalancer.shutdown();
edsBalancer = null;
}
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(Status.UNAVAILABLE.withDescription(
"Resource " + resourceName + " is unavailable")));
}
});
}
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}: {2}",
xdsClient,
error.getCode(),
error.getDescription());
if (edsBalancer == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}: {2}",
xdsClient,
error.getCode(),
error.getDescription());
if (edsBalancer == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
});
}
void shutdown() {
shutdown = true;
xdsClient.cancelCdsResourceWatch(clusterName, this);
logger.log(XdsLogLevel.INFO,
"Cancelled watcher for cluster {0} with xDS client {1}", clusterName, xdsClient);

View File

@ -30,6 +30,7 @@ import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.ForwardingLoadBalancerHelper;
@ -60,6 +61,7 @@ import javax.annotation.Nullable;
final class EdsLoadBalancer2 extends LoadBalancer {
private final XdsLogger logger;
private final SynchronizationContext syncContext;
private final LoadBalancerRegistry lbRegistry;
private final ThreadSafeRandom random;
private final GracefulSwitchLoadBalancer switchingLoadBalancer;
@ -77,7 +79,8 @@ final class EdsLoadBalancer2 extends LoadBalancer {
LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry, ThreadSafeRandom random) {
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.random = checkNotNull(random, "random");
switchingLoadBalancer = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper"));
syncContext = checkNotNull(helper, "helper").getSynchronizationContext();
switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper);
InternalLogId logId = InternalLogId.allocate("eds-lb", helper.getAuthority());
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created");
@ -156,6 +159,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
private ResolvedAddresses resolvedAddresses;
private PolicySelection localityPickingPolicy;
private PolicySelection endpointPickingPolicy;
private boolean shutdown;
@Nullable
private LoadBalancer lb;
@ -216,6 +220,7 @@ final class EdsLoadBalancer2 extends LoadBalancer {
@Override
public void shutdown() {
shutdown = true;
if (lrsServerName != null) {
xdsClient.removeClientStats(cluster, edsServiceName);
}
@ -234,89 +239,112 @@ final class EdsLoadBalancer2 extends LoadBalancer {
}
@Override
public void onChanged(EdsUpdate update) {
logger.log(XdsLogLevel.DEBUG,
"Received endpoint update from xDS client {0}: {1}", xdsClient, update);
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
"Received endpoint update: cluster_name={0}, {1} localities, {2} drop categories",
update.getClusterName(), update.getLocalityLbEndpointsMap().size(),
update.getDropPolicies().size());
}
lbHelper.updateDropPolicies(update.getDropPolicies());
Map<Locality, LocalityLbEndpoints> localityLbEndpoints = update.getLocalityLbEndpointsMap();
endpointAddresses = new ArrayList<>();
prioritizedLocalityWeights = new HashMap<>();
for (Locality locality : localityLbEndpoints.keySet()) {
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
int priority = localityLbInfo.getPriority();
boolean discard = true;
for (LbEndpoint endpoint : localityLbInfo.getEndpoints()) {
if (endpoint.isHealthy()) {
discard = false;
EquivalentAddressGroup eag =
AddressFilter.setPathFilter(
endpoint.getAddress(),
Arrays.asList(priorityName(priority), localityName(locality)));
endpointAddresses.add(eag);
public void onChanged(final EdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.DEBUG,
"Received endpoint update from xDS client {0}: {1}", xdsClient, update);
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(XdsLogLevel.INFO, "Received endpoint update: cluster_name={0}, "
+ "{1} localities, {2} drop categories", update.getClusterName(),
update.getLocalityLbEndpointsMap().size(), update.getDropPolicies().size());
}
lbHelper.updateDropPolicies(update.getDropPolicies());
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.getLocalityLbEndpointsMap();
endpointAddresses = new ArrayList<>();
prioritizedLocalityWeights = new HashMap<>();
for (Locality locality : localityLbEndpoints.keySet()) {
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
int priority = localityLbInfo.getPriority();
boolean discard = true;
for (LbEndpoint endpoint : localityLbInfo.getEndpoints()) {
if (endpoint.isHealthy()) {
discard = false;
EquivalentAddressGroup eag =
AddressFilter.setPathFilter(
endpoint.getAddress(),
Arrays.asList(priorityName(priority), localityName(locality)));
endpointAddresses.add(eag);
}
}
if (discard) {
logger.log(XdsLogLevel.INFO, "Discard locality {0} with 0 healthy endpoints");
continue;
}
if (!prioritizedLocalityWeights.containsKey(priority)) {
prioritizedLocalityWeights.put(priority, new HashMap<Locality, Integer>());
}
prioritizedLocalityWeights.get(priority).put(
locality, localityLbInfo.getLocalityWeight());
}
if (prioritizedLocalityWeights.isEmpty()) {
propagateResourceError(
Status.UNAVAILABLE.withDescription("No usable priority/locality/endpoint"));
return;
}
if (lb == null) {
lb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(lbHelper);
}
if (localityPickingPolicy != null && endpointPickingPolicy != null) {
PriorityLbConfig config = generatePriorityLbConfig(cluster, edsServiceName,
lrsServerName, localityPickingPolicy, endpointPickingPolicy, lbRegistry,
prioritizedLocalityWeights);
// TODO(chengyuanzhang): to be deleted after migrating to use XdsClient API.
Attributes attributes;
if (lrsServerName != null) {
attributes =
resolvedAddresses.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadStatsStore)
.build();
} else {
attributes = resolvedAddresses.getAttributes();
}
lb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setAddresses(endpointAddresses)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(config)
.build());
}
}
if (discard) {
logger.log(XdsLogLevel.INFO, "Discard locality {0} with 0 healthy endpoints");
continue;
}
if (!prioritizedLocalityWeights.containsKey(priority)) {
prioritizedLocalityWeights.put(priority, new HashMap<Locality, Integer>());
}
prioritizedLocalityWeights.get(priority).put(
locality, localityLbInfo.getLocalityWeight());
}
if (prioritizedLocalityWeights.isEmpty()) {
propagateResourceError(
Status.UNAVAILABLE.withDescription("No usable priority/locality/endpoint"));
return;
}
if (lb == null) {
lb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(lbHelper);
}
if (localityPickingPolicy != null && endpointPickingPolicy != null) {
PriorityLbConfig config = generatePriorityLbConfig(cluster, edsServiceName,
lrsServerName, localityPickingPolicy, endpointPickingPolicy, lbRegistry,
prioritizedLocalityWeights);
// TODO(chengyuanzhang): to be deleted after migrating to use XdsClient API.
Attributes attributes;
if (lrsServerName != null) {
attributes =
resolvedAddresses.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadStatsStore)
.build();
} else {
attributes = resolvedAddresses.getAttributes();
}
lb.handleResolvedAddresses(
resolvedAddresses.toBuilder()
.setAddresses(endpointAddresses)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(config)
.build());
}
});
}
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
propagateResourceError(
Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable"));
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName);
propagateResourceError(Status.UNAVAILABLE.withDescription(
"Resource " + resourceName + " is unavailable"));
}
});
}
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING, "Received error from xDS client {0}: {1}", xdsClient, error);
if (lb == null) {
lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
logger.log(
XdsLogLevel.WARNING, "Received error from xDS client {0}: {1}", xdsClient, error);
if (lb == null) {
lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
}
}
});
}
private void propagateResourceError(Status error) {

View File

@ -430,6 +430,7 @@ final class XdsNameResolver extends NameResolver {
.setServiceConfig(emptyServiceConfig)
// let channel take action for no config selector
.build();
private boolean stopped;
private Set<String> existingClusters;
@Nullable
private String rdsResource;
@ -438,38 +439,62 @@ final class XdsNameResolver extends NameResolver {
private long httpMaxStreamDurationNano;
@Override
public void onChanged(LdsUpdate update) {
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
httpMaxStreamDurationNano = update.getHttpMaxStreamDurationNano();
List<VirtualHost> virtualHosts = update.getVirtualHosts();
String rdsName = update.getRdsName();
if (rdsName != null && rdsName.equals(rdsResource)) {
return;
}
cleanUpRdsWatcher();
if (virtualHosts != null) {
updateRoutes(virtualHosts);
} else {
rdsResource = rdsName;
rdsWatcher = new RdsResourceWatcherImpl();
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsResource);
xdsClient.watchRdsResource(rdsResource, rdsWatcher);
}
public void onChanged(final LdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
httpMaxStreamDurationNano = update.getHttpMaxStreamDurationNano();
List<VirtualHost> virtualHosts = update.getVirtualHosts();
String rdsName = update.getRdsName();
if (rdsName != null && rdsName.equals(rdsResource)) {
return;
}
cleanUpRdsWatcher();
if (virtualHosts != null) {
updateRoutes(virtualHosts);
} else {
rdsResource = rdsName;
rdsWatcher = new RdsResourceWatcherImpl();
logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsResource);
xdsClient.watchRdsResource(rdsResource, rdsWatcher);
}
}
});
}
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
}
});
}
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "LDS resource {0} unavailable", resourceName);
cleanUpRdsWatcher();
listener.onResult(emptyResult);
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (stopped) {
return;
}
logger.log(XdsLogLevel.INFO, "LDS resource {0} unavailable", resourceName);
cleanUpRdsWatcher();
listener.onResult(emptyResult);
}
});
}
private void start() {
@ -479,6 +504,7 @@ final class XdsNameResolver extends NameResolver {
private void stop() {
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", authority);
stopped = true;
cleanUpRdsWatcher();
xdsClient.cancelLdsResourceWatch(authority, this);
}
@ -550,22 +576,46 @@ final class XdsNameResolver extends NameResolver {
private class RdsResourceWatcherImpl implements RdsResourceWatcher {
@Override
public void onChanged(RdsUpdate update) {
updateRoutes(update.getVirtualHosts());
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
return;
}
updateRoutes(update.getVirtualHosts());
}
});
}
@Override
public void onError(Status error) {
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
return;
}
logger.log(
XdsLogLevel.WARNING,
"Received error from xDS client {0}: {1}", xdsClient, error.getDescription());
listener.onError(error);
}
});
}
@Override
public void onResourceDoesNotExist(String resourceName) {
logger.log(XdsLogLevel.INFO, "RDS resource {0} unavailable", resourceName);
listener.onResult(emptyResult);
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (RdsResourceWatcherImpl.this != rdsWatcher) {
return;
}
logger.log(XdsLogLevel.INFO, "RDS resource {0} unavailable", resourceName);
listener.onResult(emptyResult);
}
});
}
}
}

View File

@ -467,6 +467,11 @@ public class CdsLoadBalancerTest {
throw new UnsupportedOperationException("should not be called");
}
@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
@Deprecated
@Override
public NameResolver.Factory getNameResolverFactory() {