diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index 41148e0323..183b4bbb7f 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; @@ -55,7 +54,6 @@ import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -115,8 +113,6 @@ interface LocalityStore { private final OrcaOobUtil orcaOobUtil; private final PriorityManager priorityManager = new PriorityManager(); private final Map localityMap = new HashMap<>(); - // Most current set of localities instructed by traffic director - private Set localities = ImmutableSet.of(); private List dropOverloads = ImmutableList.of(); private long metricsReportIntervalNano = -1; @@ -202,56 +198,20 @@ interface LocalityStore { localityMap.get(locality).shutdown(); } localityMap.clear(); - - for (Locality locality : localities) { - loadStatsStore.removeLocality(locality); - } - localities = ImmutableSet.of(); - priorityManager.reset(); } @Override public void updateLocalityStore(final Map localityInfoMap) { - Set newLocalities = localityInfoMap.keySet(); // TODO: put endPointWeights into attributes for WRR. for (Locality locality : newLocalities) { if (localityMap.containsKey(locality)) { LocalityLbInfo localityLbInfo = localityMap.get(locality); - LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality); - handleEagsOnChildBalancer(helper, localityLbInfo, localityLbEndpoints, locality); + localityLbInfo.refreshEndpoints(localityInfoMap.get(locality)); } } - - for (Locality newLocality : newLocalities) { - if (!localities.contains(newLocality)) { - loadStatsStore.addLocality(newLocality); - } - } - final Set toBeRemovedFromStatsStore = new HashSet<>(); - // There is a race between picking a subchannel and updating localities, which leads to - // the possibility that RPCs will be sent to a removed locality. As a result, those RPC - // loads will not be recorded. We consider this to be natural. By removing locality counters - // after updating subchannel pickers, we eliminate the race and conservatively record loads - // happening in that period. - for (Locality oldLocality : localities) { - if (!localityInfoMap.containsKey(oldLocality)) { - toBeRemovedFromStatsStore.add(oldLocality); - } - } - helper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - for (Locality locality : toBeRemovedFromStatsStore) { - loadStatsStore.removeLocality(locality); - } - } - }); - localities = newLocalities; - priorityManager.updateLocalities(localityInfoMap); - for (Locality oldLocality : localityMap.keySet()) { if (!newLocalities.contains(oldLocality)) { deactivate(oldLocality); @@ -346,14 +306,39 @@ interface LocalityStore { final Locality locality; final LoadBalancer childBalancer; final ChildHelper childHelper; - @Nullable private ScheduledHandle delayedDeletionTimer; - LocalityLbInfo(Locality locality, LoadBalancer childBalancer, ChildHelper childHelper) { + LocalityLbInfo(Locality locality) { this.locality = checkNotNull(locality, "locality"); - this.childBalancer = checkNotNull(childBalancer, "childBalancer"); - this.childHelper = checkNotNull(childHelper, "childHelper"); + loadStatsStore.addLocality(locality); + childHelper = new ChildHelper(); + childBalancer = loadBalancerProvider.newLoadBalancer(childHelper); + } + + void refreshEndpoints(LocalityLbEndpoints localityLbEndpoints) { + final List eags = new ArrayList<>(); + for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) { + if (endpoint.isHealthy()) { + eags.add(endpoint.getAddress()); + } + } + // In extreme case handleResolvedAddresses() may trigger updateBalancingState() + // immediately, so execute handleResolvedAddresses() after all the setup in the caller is + // complete. + childHelper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + if (eags.isEmpty() && !childBalancer.canHandleEmptyAddressListFromNameResolution()) { + childBalancer.handleNameResolutionError( + Status.UNAVAILABLE.withDescription( + "Locality " + locality + " has no healthy endpoint")); + } else { + childBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(eags).build()); + } + } + }); } void shutdown() { @@ -362,6 +347,7 @@ interface LocalityStore { delayedDeletionTimer = null; } childBalancer.shutdown(); + loadStatsStore.removeLocality(locality); logger.log(XdsLogLevel.INFO, "Shut down child balancer for locality {0}", locality); } @@ -375,73 +361,62 @@ interface LocalityStore { boolean isDeactivated() { return delayedDeletionTimer != null; } - } - class ChildHelper extends ForwardingLoadBalancerHelper { + class ChildHelper extends ForwardingLoadBalancerHelper { - private final OrcaReportingHelperWrapper orcaReportingHelperWrapper; + private final OrcaReportingHelperWrapper orcaReportingHelperWrapper; + private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER; + private ConnectivityState currentChildState = CONNECTING; - private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER; - private ConnectivityState currentChildState = CONNECTING; + ChildHelper() { + final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(locality); + Helper delegate = new ForwardingLoadBalancerHelper() { + @Override + protected Helper delegate() { + return helper; + } - ChildHelper(final Locality locality, final ClientLoadCounter counter, - OrcaOobUtil orcaOobUtil) { - checkNotNull(locality, "locality"); - checkNotNull(counter, "counter"); - checkNotNull(orcaOobUtil, "orcaOobUtil"); - Helper delegate = new ForwardingLoadBalancerHelper() { - @Override - protected Helper delegate() { - return helper; + @Override + public void updateBalancingState( + ConnectivityState newState, SubchannelPicker newPicker) { + logger.log( + XdsLogLevel.INFO, + "Update load balancing state for locality {0} to {1}", locality, newState); + currentChildState = newState; + currentChildPicker = + new LoadRecordingSubchannelPicker( + counter, + new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter), + newPicker, orcaPerRequestUtil)); + + priorityManager.updatePriorityState(priorityManager.getPriority(locality)); + } + + @Override + public String getAuthority() { + //FIXME: This should be a new proposed field of Locality, locality_name + return locality.getSubZone(); + } + }; + + orcaReportingHelperWrapper = + orcaOobUtil.newOrcaReportingHelperWrapper( + delegate, new MetricsRecordingListener(counter)); + if (metricsReportIntervalNano > 0) { + updateMetricsReportInterval(metricsReportIntervalNano); } - - @Override - public void updateBalancingState(ConnectivityState newState, - final SubchannelPicker newPicker) { - checkNotNull(newState, "newState"); - checkNotNull(newPicker, "newPicker"); - logger.log( - XdsLogLevel.INFO, - "Update load balancing state for locality {0} to {1}", locality, newState); - currentChildState = newState; - currentChildPicker = - new LoadRecordingSubchannelPicker(counter, - new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter), - newPicker, orcaPerRequestUtil)); - - // delegate to parent helper - priorityManager.updatePriorityState(priorityManager.getPriority(locality)); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("locality", locality).toString(); - } - - @Override - public String getAuthority() { - //FIXME: This should be a new proposed field of Locality, locality_name - return locality.getSubZone(); - } - }; - orcaReportingHelperWrapper = - checkNotNull(orcaOobUtil, "orcaOobUtil") - .newOrcaReportingHelperWrapper(delegate, new MetricsRecordingListener(counter)); - - if (metricsReportIntervalNano > 0) { - updateMetricsReportInterval(metricsReportIntervalNano); } - } - void updateMetricsReportInterval(long intervalNanos) { - orcaReportingHelperWrapper - .setReportingConfig(OrcaReportingConfig.newBuilder() - .setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build()); - } + void updateMetricsReportInterval(long intervalNanos) { + orcaReportingHelperWrapper + .setReportingConfig(OrcaReportingConfig.newBuilder() + .setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build()); + } - @Override - protected Helper delegate() { - return orcaReportingHelperWrapper.asHelper(); + @Override + protected Helper delegate() { + return orcaReportingHelperWrapper.asHelper(); + } } } @@ -594,47 +569,10 @@ interface LocalityStore { private void initLocality(Locality locality) { logger.log(XdsLogLevel.INFO, "Create child balancer for locality {0}", locality); - ChildHelper childHelper = - new ChildHelper(locality, loadStatsStore.getLocalityCounter(locality), - orcaOobUtil); - LocalityLbInfo localityLbInfo = - new LocalityLbInfo( - locality, - loadBalancerProvider.newLoadBalancer(childHelper), - childHelper); + LocalityLbInfo localityLbInfo = new LocalityLbInfo(locality); localityMap.put(locality, localityLbInfo); - LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality); - handleEagsOnChildBalancer(childHelper, localityLbInfo, localityLbEndpoints, locality); + localityLbInfo.refreshEndpoints(localityInfoMap.get(locality)); } } - - private static void handleEagsOnChildBalancer( - Helper childHelper, final LocalityLbInfo localityLbInfo, - final LocalityLbEndpoints localityLbEndpoints, final Locality locality) { - final List eags = new ArrayList<>(); - for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) { - if (endpoint.isHealthy()) { - eags.add(endpoint.getAddress()); - } - } - // In extreme case handleResolvedAddresses() may trigger updateBalancingState() - // immediately, so execute handleResolvedAddresses() after all the setup in the caller is - // complete. - childHelper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - if (eags.isEmpty() - && !localityLbInfo.childBalancer.canHandleEmptyAddressListFromNameResolution()) { - localityLbInfo.childBalancer.handleNameResolutionError( - Status.UNAVAILABLE.withDescription( - "Locality " + locality + " has no healthy endpoint")); - } else { - localityLbInfo.childBalancer - .handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags).build()); - } - } - }); - } } } diff --git a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java index 52c11b4cc8..4f9c8f5175 100644 --- a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java @@ -235,39 +235,6 @@ public class LocalityStoreTest { orcaPerRequestUtil, orcaOobUtil); } - @Test - @SuppressWarnings("unchecked") - public void updateLocalityStore_updateStatsStoreLocalityTracking() { - Map localityInfoMap = new HashMap<>(); - localityInfoMap - .put(locality1, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0)); - localityInfoMap - .put(locality2, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).addLocality(locality1); - verify(loadStatsStore).addLocality(locality2); - - localityInfoMap - .put(locality3, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).addLocality(locality3); - - localityInfoMap = ImmutableMap - .of(locality4, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).removeLocality(locality1); - verify(loadStatsStore).removeLocality(locality2); - verify(loadStatsStore).removeLocality(locality3); - verify(loadStatsStore).addLocality(locality4); - - localityStore.updateLocalityStore(ImmutableMap.copyOf(Collections.EMPTY_MAP)); - verify(loadStatsStore).removeLocality(locality4); - } - @Test public void updateLocalityStore_pickResultInterceptedForLoadRecordingWhenSubchannelReady() { // Simulate receiving two localities.