diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java index 8ad0b0de9a..545e574de2 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer.java @@ -29,6 +29,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -52,6 +53,7 @@ import javax.annotation.Nullable; final class CdsLoadBalancer extends LoadBalancer { private final XdsLogger logger; private final Helper helper; + private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; private final TlsContextManager tlsContextManager; // TODO(sanjaypujare): remove once xds security is released @@ -71,6 +73,7 @@ final class CdsLoadBalancer extends LoadBalancer { CdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, TlsContextManager tlsContextManager) { this.helper = checkNotNull(helper, "helper"); + this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.lbRegistry = lbRegistry; this.tlsContextManager = tlsContextManager; logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority())); @@ -179,6 +182,7 @@ final class CdsLoadBalancer extends LoadBalancer { private final class CdsLbState implements CdsResourceWatcher { private final ChannelSecurityLbHelper lbHelper = new ChannelSecurityLbHelper(); + private boolean shutdown; @Nullable LoadBalancer edsBalancer; @@ -189,37 +193,43 @@ final class CdsLoadBalancer extends LoadBalancer { } @Override - public void onChanged(CdsUpdate newUpdate) { - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log( - XdsLogLevel.INFO, - "Received cluster update from xDS client {0}: " - + "cluster_name={1}, eds_service_name={2}, lb_policy={3}, report_load={4}", - xdsClient, newUpdate.getClusterName(), newUpdate.getEdsServiceName(), - newUpdate.getLbPolicy(), newUpdate.getLrsServerName() != null); - } - // FIXME(chengyuanzhang): handle error correctly to avoid being unnecessarily fragile. - checkArgument( - newUpdate.getLbPolicy().equals("round_robin"), "can only support round_robin policy"); - LoadBalancerProvider endpointPickingPolicyProvider = - lbRegistry.getProvider(newUpdate.getLbPolicy()); - LoadBalancerProvider localityPickingPolicyProvider = - lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); // hardcode to weighted-target - final EdsConfig edsConfig = - new EdsConfig( - /* clusterName = */ newUpdate.getClusterName(), - /* edsServiceName = */ newUpdate.getEdsServiceName(), - /* lrsServerName = */ newUpdate.getLrsServerName(), - new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */), - new PolicySelection(endpointPickingPolicyProvider, null)); - if (isXdsSecurityEnabled()) { - updateSslContextProviderSupplier(newUpdate.getUpstreamTlsContext()); - } - if (edsBalancer == null) { - edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(lbHelper); - } - edsBalancer.handleResolvedAddresses( - resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build()); + public void onChanged(final CdsUpdate newUpdate) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + if (logger.isLoggable(XdsLogLevel.INFO)) { + logger.log(XdsLogLevel.INFO, "Received cluster update from xDS client {0}: " + + "cluster_name={1}, eds_service_name={2}, lb_policy={3}, report_load={4}", + xdsClient, newUpdate.getClusterName(), newUpdate.getEdsServiceName(), + newUpdate.getLbPolicy(), newUpdate.getLrsServerName() != null); + } + // FIXME(chengyuanzhang): handle error correctly to avoid being unnecessarily fragile. + checkArgument(newUpdate.getLbPolicy().equals("round_robin"), + "can only support round_robin policy"); + LoadBalancerProvider endpointPickingPolicyProvider = + lbRegistry.getProvider(newUpdate.getLbPolicy()); + LoadBalancerProvider localityPickingPolicyProvider = + lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); // hardcode to weighted-target + final EdsConfig edsConfig = + new EdsConfig( + /* clusterName = */ newUpdate.getClusterName(), + /* edsServiceName = */ newUpdate.getEdsServiceName(), + /* lrsServerName = */ newUpdate.getLrsServerName(), + new PolicySelection(localityPickingPolicyProvider, null /* by EDS policy */), + new PolicySelection(endpointPickingPolicyProvider, null)); + if (isXdsSecurityEnabled()) { + updateSslContextProviderSupplier(newUpdate.getUpstreamTlsContext()); + } + if (edsBalancer == null) { + edsBalancer = lbRegistry.getProvider(EDS_POLICY_NAME).newLoadBalancer(lbHelper); + } + edsBalancer.handleResolvedAddresses( + resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(edsConfig).build()); + } + }); } /** For new UpstreamTlsContext value, release old SslContextProvider. */ @@ -244,32 +254,49 @@ final class CdsLoadBalancer extends LoadBalancer { } @Override - public void onResourceDoesNotExist(String resourceName) { - logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); - if (edsBalancer != null) { - edsBalancer.shutdown(); - edsBalancer = null; - } - helper.updateBalancingState( - TRANSIENT_FAILURE, - new ErrorPicker( - Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable"))); + public void onResourceDoesNotExist(final String resourceName) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); + if (edsBalancer != null) { + edsBalancer.shutdown(); + edsBalancer = null; + } + helper.updateBalancingState( + TRANSIENT_FAILURE, + new ErrorPicker(Status.UNAVAILABLE.withDescription( + "Resource " + resourceName + " is unavailable"))); + } + }); } @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, - "Received error from xDS client {0}: {1}: {2}", - xdsClient, - error.getCode(), - error.getDescription()); - if (edsBalancer == null) { - helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); - } + public void onError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log( + XdsLogLevel.WARNING, + "Received error from xDS client {0}: {1}: {2}", + xdsClient, + error.getCode(), + error.getDescription()); + if (edsBalancer == null) { + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + } + }); } void shutdown() { + shutdown = true; xdsClient.cancelCdsResourceWatch(clusterName, this); logger.log(XdsLogLevel.INFO, "Cancelled watcher for cluster {0} with xDS client {1}", clusterName, xdsClient); diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index 549e46031d..4ebd11f731 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -30,6 +30,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerProvider; import io.grpc.LoadBalancerRegistry; import io.grpc.Status; +import io.grpc.SynchronizationContext; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.util.ForwardingLoadBalancerHelper; @@ -60,6 +61,7 @@ import javax.annotation.Nullable; final class EdsLoadBalancer2 extends LoadBalancer { private final XdsLogger logger; + private final SynchronizationContext syncContext; private final LoadBalancerRegistry lbRegistry; private final ThreadSafeRandom random; private final GracefulSwitchLoadBalancer switchingLoadBalancer; @@ -77,7 +79,8 @@ final class EdsLoadBalancer2 extends LoadBalancer { LoadBalancer.Helper helper, LoadBalancerRegistry lbRegistry, ThreadSafeRandom random) { this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry"); this.random = checkNotNull(random, "random"); - switchingLoadBalancer = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper")); + syncContext = checkNotNull(helper, "helper").getSynchronizationContext(); + switchingLoadBalancer = new GracefulSwitchLoadBalancer(helper); InternalLogId logId = InternalLogId.allocate("eds-lb", helper.getAuthority()); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); @@ -156,6 +159,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { private ResolvedAddresses resolvedAddresses; private PolicySelection localityPickingPolicy; private PolicySelection endpointPickingPolicy; + private boolean shutdown; @Nullable private LoadBalancer lb; @@ -216,6 +220,7 @@ final class EdsLoadBalancer2 extends LoadBalancer { @Override public void shutdown() { + shutdown = true; if (lrsServerName != null) { xdsClient.removeClientStats(cluster, edsServiceName); } @@ -234,89 +239,112 @@ final class EdsLoadBalancer2 extends LoadBalancer { } @Override - public void onChanged(EdsUpdate update) { - logger.log(XdsLogLevel.DEBUG, - "Received endpoint update from xDS client {0}: {1}", xdsClient, update); - if (logger.isLoggable(XdsLogLevel.INFO)) { - logger.log( - XdsLogLevel.INFO, - "Received endpoint update: cluster_name={0}, {1} localities, {2} drop categories", - update.getClusterName(), update.getLocalityLbEndpointsMap().size(), - update.getDropPolicies().size()); - } - lbHelper.updateDropPolicies(update.getDropPolicies()); - Map localityLbEndpoints = update.getLocalityLbEndpointsMap(); - endpointAddresses = new ArrayList<>(); - prioritizedLocalityWeights = new HashMap<>(); - for (Locality locality : localityLbEndpoints.keySet()) { - LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); - int priority = localityLbInfo.getPriority(); - boolean discard = true; - for (LbEndpoint endpoint : localityLbInfo.getEndpoints()) { - if (endpoint.isHealthy()) { - discard = false; - EquivalentAddressGroup eag = - AddressFilter.setPathFilter( - endpoint.getAddress(), - Arrays.asList(priorityName(priority), localityName(locality))); - endpointAddresses.add(eag); + public void onChanged(final EdsUpdate update) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log(XdsLogLevel.DEBUG, + "Received endpoint update from xDS client {0}: {1}", xdsClient, update); + if (logger.isLoggable(XdsLogLevel.INFO)) { + logger.log(XdsLogLevel.INFO, "Received endpoint update: cluster_name={0}, " + + "{1} localities, {2} drop categories", update.getClusterName(), + update.getLocalityLbEndpointsMap().size(), update.getDropPolicies().size()); + } + lbHelper.updateDropPolicies(update.getDropPolicies()); + Map localityLbEndpoints = + update.getLocalityLbEndpointsMap(); + endpointAddresses = new ArrayList<>(); + prioritizedLocalityWeights = new HashMap<>(); + for (Locality locality : localityLbEndpoints.keySet()) { + LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); + int priority = localityLbInfo.getPriority(); + boolean discard = true; + for (LbEndpoint endpoint : localityLbInfo.getEndpoints()) { + if (endpoint.isHealthy()) { + discard = false; + EquivalentAddressGroup eag = + AddressFilter.setPathFilter( + endpoint.getAddress(), + Arrays.asList(priorityName(priority), localityName(locality))); + endpointAddresses.add(eag); + } + } + if (discard) { + logger.log(XdsLogLevel.INFO, "Discard locality {0} with 0 healthy endpoints"); + continue; + } + if (!prioritizedLocalityWeights.containsKey(priority)) { + prioritizedLocalityWeights.put(priority, new HashMap()); + } + prioritizedLocalityWeights.get(priority).put( + locality, localityLbInfo.getLocalityWeight()); + } + if (prioritizedLocalityWeights.isEmpty()) { + propagateResourceError( + Status.UNAVAILABLE.withDescription("No usable priority/locality/endpoint")); + return; + } + if (lb == null) { + lb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(lbHelper); + } + if (localityPickingPolicy != null && endpointPickingPolicy != null) { + PriorityLbConfig config = generatePriorityLbConfig(cluster, edsServiceName, + lrsServerName, localityPickingPolicy, endpointPickingPolicy, lbRegistry, + prioritizedLocalityWeights); + // TODO(chengyuanzhang): to be deleted after migrating to use XdsClient API. + Attributes attributes; + if (lrsServerName != null) { + attributes = + resolvedAddresses.getAttributes().toBuilder() + .set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadStatsStore) + .build(); + } else { + attributes = resolvedAddresses.getAttributes(); + } + lb.handleResolvedAddresses( + resolvedAddresses.toBuilder() + .setAddresses(endpointAddresses) + .setAttributes(attributes) + .setLoadBalancingPolicyConfig(config) + .build()); } } - if (discard) { - logger.log(XdsLogLevel.INFO, "Discard locality {0} with 0 healthy endpoints"); - continue; - } - if (!prioritizedLocalityWeights.containsKey(priority)) { - prioritizedLocalityWeights.put(priority, new HashMap()); - } - prioritizedLocalityWeights.get(priority).put( - locality, localityLbInfo.getLocalityWeight()); - } - if (prioritizedLocalityWeights.isEmpty()) { - propagateResourceError( - Status.UNAVAILABLE.withDescription("No usable priority/locality/endpoint")); - return; - } - if (lb == null) { - lb = lbRegistry.getProvider(PRIORITY_POLICY_NAME).newLoadBalancer(lbHelper); - } - if (localityPickingPolicy != null && endpointPickingPolicy != null) { - PriorityLbConfig config = generatePriorityLbConfig(cluster, edsServiceName, - lrsServerName, localityPickingPolicy, endpointPickingPolicy, lbRegistry, - prioritizedLocalityWeights); - // TODO(chengyuanzhang): to be deleted after migrating to use XdsClient API. - Attributes attributes; - if (lrsServerName != null) { - attributes = - resolvedAddresses.getAttributes().toBuilder() - .set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadStatsStore) - .build(); - } else { - attributes = resolvedAddresses.getAttributes(); - } - lb.handleResolvedAddresses( - resolvedAddresses.toBuilder() - .setAddresses(endpointAddresses) - .setAttributes(attributes) - .setLoadBalancingPolicyConfig(config) - .build()); - } + }); } @Override - public void onResourceDoesNotExist(String resourceName) { - logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); - propagateResourceError( - Status.UNAVAILABLE.withDescription("Resource " + resourceName + " is unavailable")); + public void onResourceDoesNotExist(final String resourceName) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log(XdsLogLevel.INFO, "Resource {0} is unavailable", resourceName); + propagateResourceError(Status.UNAVAILABLE.withDescription( + "Resource " + resourceName + " is unavailable")); + } + }); } @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, "Received error from xDS client {0}: {1}", xdsClient, error); - if (lb == null) { - lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); - } + public void onError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (shutdown) { + return; + } + logger.log( + XdsLogLevel.WARNING, "Received error from xDS client {0}: {1}", xdsClient, error); + if (lb == null) { + lbHelper.helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error)); + } + } + }); } private void propagateResourceError(Status error) { diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index b6237860e0..e350b64ff7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -430,6 +430,7 @@ final class XdsNameResolver extends NameResolver { .setServiceConfig(emptyServiceConfig) // let channel take action for no config selector .build(); + private boolean stopped; private Set existingClusters; @Nullable private String rdsResource; @@ -438,38 +439,62 @@ final class XdsNameResolver extends NameResolver { private long httpMaxStreamDurationNano; @Override - public void onChanged(LdsUpdate update) { - logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); - httpMaxStreamDurationNano = update.getHttpMaxStreamDurationNano(); - List virtualHosts = update.getVirtualHosts(); - String rdsName = update.getRdsName(); - if (rdsName != null && rdsName.equals(rdsResource)) { - return; - } - cleanUpRdsWatcher(); - if (virtualHosts != null) { - updateRoutes(virtualHosts); - } else { - rdsResource = rdsName; - rdsWatcher = new RdsResourceWatcherImpl(); - logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsResource); - xdsClient.watchRdsResource(rdsResource, rdsWatcher); - } + public void onChanged(final LdsUpdate update) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update); + httpMaxStreamDurationNano = update.getHttpMaxStreamDurationNano(); + List virtualHosts = update.getVirtualHosts(); + String rdsName = update.getRdsName(); + if (rdsName != null && rdsName.equals(rdsResource)) { + return; + } + cleanUpRdsWatcher(); + if (virtualHosts != null) { + updateRoutes(virtualHosts); + } else { + rdsResource = rdsName; + rdsWatcher = new RdsResourceWatcherImpl(); + logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsResource); + xdsClient.watchRdsResource(rdsResource, rdsWatcher); + } + } + }); } @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, - "Received error from xDS client {0}: {1}", xdsClient, error.getDescription()); - listener.onError(error); + public void onError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + logger.log( + XdsLogLevel.WARNING, + "Received error from xDS client {0}: {1}", xdsClient, error.getDescription()); + listener.onError(error); + } + }); } @Override - public void onResourceDoesNotExist(String resourceName) { - logger.log(XdsLogLevel.INFO, "LDS resource {0} unavailable", resourceName); - cleanUpRdsWatcher(); - listener.onResult(emptyResult); + public void onResourceDoesNotExist(final String resourceName) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (stopped) { + return; + } + logger.log(XdsLogLevel.INFO, "LDS resource {0} unavailable", resourceName); + cleanUpRdsWatcher(); + listener.onResult(emptyResult); + } + }); } private void start() { @@ -479,6 +504,7 @@ final class XdsNameResolver extends NameResolver { private void stop() { logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", authority); + stopped = true; cleanUpRdsWatcher(); xdsClient.cancelLdsResourceWatch(authority, this); } @@ -550,22 +576,46 @@ final class XdsNameResolver extends NameResolver { private class RdsResourceWatcherImpl implements RdsResourceWatcher { @Override - public void onChanged(RdsUpdate update) { - updateRoutes(update.getVirtualHosts()); + public void onChanged(final RdsUpdate update) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RdsResourceWatcherImpl.this != rdsWatcher) { + return; + } + updateRoutes(update.getVirtualHosts()); + } + }); } @Override - public void onError(Status error) { - logger.log( - XdsLogLevel.WARNING, - "Received error from xDS client {0}: {1}", xdsClient, error.getDescription()); - listener.onError(error); + public void onError(final Status error) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RdsResourceWatcherImpl.this != rdsWatcher) { + return; + } + logger.log( + XdsLogLevel.WARNING, + "Received error from xDS client {0}: {1}", xdsClient, error.getDescription()); + listener.onError(error); + } + }); } @Override - public void onResourceDoesNotExist(String resourceName) { - logger.log(XdsLogLevel.INFO, "RDS resource {0} unavailable", resourceName); - listener.onResult(emptyResult); + public void onResourceDoesNotExist(final String resourceName) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (RdsResourceWatcherImpl.this != rdsWatcher) { + return; + } + logger.log(XdsLogLevel.INFO, "RDS resource {0} unavailable", resourceName); + listener.onResult(emptyResult); + } + }); } } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java index 831ff06d5c..4adc6f4400 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancerTest.java @@ -467,6 +467,11 @@ public class CdsLoadBalancerTest { throw new UnsupportedOperationException("should not be called"); } + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + @Deprecated @Override public NameResolver.Factory getNameResolverFactory() {