From c777e0856327de03225bb3bdd9660001950f87e3 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Sat, 6 Jun 2020 01:02:15 +0000 Subject: [PATCH] xds: retain locality stats counter when the child balancer for that locality is deactivated (#7096) Create the counter for recording per locality stats upon creating the child balancer for that locality. When the locality is deactivated (due to EDS response update removes it), the counter is not deleted from the LoadStatsStore. Delete it when the child balancer for that locality is shut down. In this way, the lifecycle of the load stats counter for a certain locality stays same with the child balancer for that locality. This is exactly what will happen after we refactor LocalityStore to PriorityLoadBalancer and LrsLoadBalancer (i.e., when some priority is deactivated, its subtree is not deleted immediately, so the LrsLoadBalancer instances for localities still hold the load stats counters). --- .../main/java/io/grpc/xds/LocalityStore.java | 224 +++++++----------- .../java/io/grpc/xds/LocalityStoreTest.java | 33 --- 2 files changed, 81 insertions(+), 176 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index 41148e0323..183b4bbb7f 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; @@ -55,7 +54,6 @@ import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -115,8 +113,6 @@ interface LocalityStore { private final OrcaOobUtil orcaOobUtil; private final PriorityManager priorityManager = new PriorityManager(); private final Map localityMap = new HashMap<>(); - // Most current set of localities instructed by traffic director - private Set localities = ImmutableSet.of(); private List dropOverloads = ImmutableList.of(); private long metricsReportIntervalNano = -1; @@ -202,56 +198,20 @@ interface LocalityStore { localityMap.get(locality).shutdown(); } localityMap.clear(); - - for (Locality locality : localities) { - loadStatsStore.removeLocality(locality); - } - localities = ImmutableSet.of(); - priorityManager.reset(); } @Override public void updateLocalityStore(final Map localityInfoMap) { - Set newLocalities = localityInfoMap.keySet(); // TODO: put endPointWeights into attributes for WRR. for (Locality locality : newLocalities) { if (localityMap.containsKey(locality)) { LocalityLbInfo localityLbInfo = localityMap.get(locality); - LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality); - handleEagsOnChildBalancer(helper, localityLbInfo, localityLbEndpoints, locality); + localityLbInfo.refreshEndpoints(localityInfoMap.get(locality)); } } - - for (Locality newLocality : newLocalities) { - if (!localities.contains(newLocality)) { - loadStatsStore.addLocality(newLocality); - } - } - final Set toBeRemovedFromStatsStore = new HashSet<>(); - // There is a race between picking a subchannel and updating localities, which leads to - // the possibility that RPCs will be sent to a removed locality. As a result, those RPC - // loads will not be recorded. We consider this to be natural. By removing locality counters - // after updating subchannel pickers, we eliminate the race and conservatively record loads - // happening in that period. - for (Locality oldLocality : localities) { - if (!localityInfoMap.containsKey(oldLocality)) { - toBeRemovedFromStatsStore.add(oldLocality); - } - } - helper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - for (Locality locality : toBeRemovedFromStatsStore) { - loadStatsStore.removeLocality(locality); - } - } - }); - localities = newLocalities; - priorityManager.updateLocalities(localityInfoMap); - for (Locality oldLocality : localityMap.keySet()) { if (!newLocalities.contains(oldLocality)) { deactivate(oldLocality); @@ -346,14 +306,39 @@ interface LocalityStore { final Locality locality; final LoadBalancer childBalancer; final ChildHelper childHelper; - @Nullable private ScheduledHandle delayedDeletionTimer; - LocalityLbInfo(Locality locality, LoadBalancer childBalancer, ChildHelper childHelper) { + LocalityLbInfo(Locality locality) { this.locality = checkNotNull(locality, "locality"); - this.childBalancer = checkNotNull(childBalancer, "childBalancer"); - this.childHelper = checkNotNull(childHelper, "childHelper"); + loadStatsStore.addLocality(locality); + childHelper = new ChildHelper(); + childBalancer = loadBalancerProvider.newLoadBalancer(childHelper); + } + + void refreshEndpoints(LocalityLbEndpoints localityLbEndpoints) { + final List eags = new ArrayList<>(); + for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) { + if (endpoint.isHealthy()) { + eags.add(endpoint.getAddress()); + } + } + // In extreme case handleResolvedAddresses() may trigger updateBalancingState() + // immediately, so execute handleResolvedAddresses() after all the setup in the caller is + // complete. + childHelper.getSynchronizationContext().execute(new Runnable() { + @Override + public void run() { + if (eags.isEmpty() && !childBalancer.canHandleEmptyAddressListFromNameResolution()) { + childBalancer.handleNameResolutionError( + Status.UNAVAILABLE.withDescription( + "Locality " + locality + " has no healthy endpoint")); + } else { + childBalancer.handleResolvedAddresses( + ResolvedAddresses.newBuilder().setAddresses(eags).build()); + } + } + }); } void shutdown() { @@ -362,6 +347,7 @@ interface LocalityStore { delayedDeletionTimer = null; } childBalancer.shutdown(); + loadStatsStore.removeLocality(locality); logger.log(XdsLogLevel.INFO, "Shut down child balancer for locality {0}", locality); } @@ -375,73 +361,62 @@ interface LocalityStore { boolean isDeactivated() { return delayedDeletionTimer != null; } - } - class ChildHelper extends ForwardingLoadBalancerHelper { + class ChildHelper extends ForwardingLoadBalancerHelper { - private final OrcaReportingHelperWrapper orcaReportingHelperWrapper; + private final OrcaReportingHelperWrapper orcaReportingHelperWrapper; + private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER; + private ConnectivityState currentChildState = CONNECTING; - private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER; - private ConnectivityState currentChildState = CONNECTING; + ChildHelper() { + final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(locality); + Helper delegate = new ForwardingLoadBalancerHelper() { + @Override + protected Helper delegate() { + return helper; + } - ChildHelper(final Locality locality, final ClientLoadCounter counter, - OrcaOobUtil orcaOobUtil) { - checkNotNull(locality, "locality"); - checkNotNull(counter, "counter"); - checkNotNull(orcaOobUtil, "orcaOobUtil"); - Helper delegate = new ForwardingLoadBalancerHelper() { - @Override - protected Helper delegate() { - return helper; + @Override + public void updateBalancingState( + ConnectivityState newState, SubchannelPicker newPicker) { + logger.log( + XdsLogLevel.INFO, + "Update load balancing state for locality {0} to {1}", locality, newState); + currentChildState = newState; + currentChildPicker = + new LoadRecordingSubchannelPicker( + counter, + new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter), + newPicker, orcaPerRequestUtil)); + + priorityManager.updatePriorityState(priorityManager.getPriority(locality)); + } + + @Override + public String getAuthority() { + //FIXME: This should be a new proposed field of Locality, locality_name + return locality.getSubZone(); + } + }; + + orcaReportingHelperWrapper = + orcaOobUtil.newOrcaReportingHelperWrapper( + delegate, new MetricsRecordingListener(counter)); + if (metricsReportIntervalNano > 0) { + updateMetricsReportInterval(metricsReportIntervalNano); } - - @Override - public void updateBalancingState(ConnectivityState newState, - final SubchannelPicker newPicker) { - checkNotNull(newState, "newState"); - checkNotNull(newPicker, "newPicker"); - logger.log( - XdsLogLevel.INFO, - "Update load balancing state for locality {0} to {1}", locality, newState); - currentChildState = newState; - currentChildPicker = - new LoadRecordingSubchannelPicker(counter, - new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter), - newPicker, orcaPerRequestUtil)); - - // delegate to parent helper - priorityManager.updatePriorityState(priorityManager.getPriority(locality)); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this).add("locality", locality).toString(); - } - - @Override - public String getAuthority() { - //FIXME: This should be a new proposed field of Locality, locality_name - return locality.getSubZone(); - } - }; - orcaReportingHelperWrapper = - checkNotNull(orcaOobUtil, "orcaOobUtil") - .newOrcaReportingHelperWrapper(delegate, new MetricsRecordingListener(counter)); - - if (metricsReportIntervalNano > 0) { - updateMetricsReportInterval(metricsReportIntervalNano); } - } - void updateMetricsReportInterval(long intervalNanos) { - orcaReportingHelperWrapper - .setReportingConfig(OrcaReportingConfig.newBuilder() - .setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build()); - } + void updateMetricsReportInterval(long intervalNanos) { + orcaReportingHelperWrapper + .setReportingConfig(OrcaReportingConfig.newBuilder() + .setReportInterval(intervalNanos, TimeUnit.NANOSECONDS).build()); + } - @Override - protected Helper delegate() { - return orcaReportingHelperWrapper.asHelper(); + @Override + protected Helper delegate() { + return orcaReportingHelperWrapper.asHelper(); + } } } @@ -594,47 +569,10 @@ interface LocalityStore { private void initLocality(Locality locality) { logger.log(XdsLogLevel.INFO, "Create child balancer for locality {0}", locality); - ChildHelper childHelper = - new ChildHelper(locality, loadStatsStore.getLocalityCounter(locality), - orcaOobUtil); - LocalityLbInfo localityLbInfo = - new LocalityLbInfo( - locality, - loadBalancerProvider.newLoadBalancer(childHelper), - childHelper); + LocalityLbInfo localityLbInfo = new LocalityLbInfo(locality); localityMap.put(locality, localityLbInfo); - LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality); - handleEagsOnChildBalancer(childHelper, localityLbInfo, localityLbEndpoints, locality); + localityLbInfo.refreshEndpoints(localityInfoMap.get(locality)); } } - - private static void handleEagsOnChildBalancer( - Helper childHelper, final LocalityLbInfo localityLbInfo, - final LocalityLbEndpoints localityLbEndpoints, final Locality locality) { - final List eags = new ArrayList<>(); - for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) { - if (endpoint.isHealthy()) { - eags.add(endpoint.getAddress()); - } - } - // In extreme case handleResolvedAddresses() may trigger updateBalancingState() - // immediately, so execute handleResolvedAddresses() after all the setup in the caller is - // complete. - childHelper.getSynchronizationContext().execute(new Runnable() { - @Override - public void run() { - if (eags.isEmpty() - && !localityLbInfo.childBalancer.canHandleEmptyAddressListFromNameResolution()) { - localityLbInfo.childBalancer.handleNameResolutionError( - Status.UNAVAILABLE.withDescription( - "Locality " + locality + " has no healthy endpoint")); - } else { - localityLbInfo.childBalancer - .handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(eags).build()); - } - } - }); - } } } diff --git a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java index 52c11b4cc8..4f9c8f5175 100644 --- a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java @@ -235,39 +235,6 @@ public class LocalityStoreTest { orcaPerRequestUtil, orcaOobUtil); } - @Test - @SuppressWarnings("unchecked") - public void updateLocalityStore_updateStatsStoreLocalityTracking() { - Map localityInfoMap = new HashMap<>(); - localityInfoMap - .put(locality1, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0)); - localityInfoMap - .put(locality2, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).addLocality(locality1); - verify(loadStatsStore).addLocality(locality2); - - localityInfoMap - .put(locality3, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).addLocality(locality3); - - localityInfoMap = ImmutableMap - .of(locality4, - new LocalityLbEndpoints(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4, 0)); - localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap)); - verify(loadStatsStore).removeLocality(locality1); - verify(loadStatsStore).removeLocality(locality2); - verify(loadStatsStore).removeLocality(locality3); - verify(loadStatsStore).addLocality(locality4); - - localityStore.updateLocalityStore(ImmutableMap.copyOf(Collections.EMPTY_MAP)); - verify(loadStatsStore).removeLocality(locality4); - } - @Test public void updateLocalityStore_pickResultInterceptedForLoadRecordingWhenSubchannelReady() { // Simulate receiving two localities.