xds: retain locality stats counter when the child balancer for that locality is deactivated (#7096)

Create the counter for recording per locality stats upon creating the child balancer for that locality. When the locality is deactivated (due to EDS response update removes it), the counter is not deleted from the LoadStatsStore. Delete it when the child balancer for that locality is shut down. In this way, the lifecycle of the load stats counter for a certain locality stays same with the child balancer for that locality. This is exactly what will happen after we refactor LocalityStore to PriorityLoadBalancer and LrsLoadBalancer (i.e., when some priority is deactivated, its subtree is not deleted immediately, so the LrsLoadBalancer instances for localities still hold the load stats counters).
This commit is contained in:
Chengyuan Zhang 2020-06-06 01:02:15 +00:00 committed by GitHub
parent 959769aad8
commit c777e08563
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 176 deletions

View File

@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
@ -55,7 +54,6 @@ import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -115,8 +113,6 @@ interface LocalityStore {
private final OrcaOobUtil orcaOobUtil;
private final PriorityManager priorityManager = new PriorityManager();
private final Map<Locality, LocalityLbInfo> localityMap = new HashMap<>();
// Most current set of localities instructed by traffic director
private Set<Locality> localities = ImmutableSet.of();
private List<DropOverload> dropOverloads = ImmutableList.of();
private long metricsReportIntervalNano = -1;
@ -202,56 +198,20 @@ interface LocalityStore {
localityMap.get(locality).shutdown();
}
localityMap.clear();
for (Locality locality : localities) {
loadStatsStore.removeLocality(locality);
}
localities = ImmutableSet.of();
priorityManager.reset();
}
@Override
public void updateLocalityStore(final Map<Locality, LocalityLbEndpoints> localityInfoMap) {
Set<Locality> newLocalities = localityInfoMap.keySet();
// TODO: put endPointWeights into attributes for WRR.
for (Locality locality : newLocalities) {
if (localityMap.containsKey(locality)) {
LocalityLbInfo localityLbInfo = localityMap.get(locality);
LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
handleEagsOnChildBalancer(helper, localityLbInfo, localityLbEndpoints, locality);
localityLbInfo.refreshEndpoints(localityInfoMap.get(locality));
}
}
for (Locality newLocality : newLocalities) {
if (!localities.contains(newLocality)) {
loadStatsStore.addLocality(newLocality);
}
}
final Set<Locality> toBeRemovedFromStatsStore = new HashSet<>();
// There is a race between picking a subchannel and updating localities, which leads to
// the possibility that RPCs will be sent to a removed locality. As a result, those RPC
// loads will not be recorded. We consider this to be natural. By removing locality counters
// after updating subchannel pickers, we eliminate the race and conservatively record loads
// happening in that period.
for (Locality oldLocality : localities) {
if (!localityInfoMap.containsKey(oldLocality)) {
toBeRemovedFromStatsStore.add(oldLocality);
}
}
helper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
for (Locality locality : toBeRemovedFromStatsStore) {
loadStatsStore.removeLocality(locality);
}
}
});
localities = newLocalities;
priorityManager.updateLocalities(localityInfoMap);
for (Locality oldLocality : localityMap.keySet()) {
if (!newLocalities.contains(oldLocality)) {
deactivate(oldLocality);
@ -346,14 +306,39 @@ interface LocalityStore {
final Locality locality;
final LoadBalancer childBalancer;
final ChildHelper childHelper;
@Nullable
private ScheduledHandle delayedDeletionTimer;
LocalityLbInfo(Locality locality, LoadBalancer childBalancer, ChildHelper childHelper) {
LocalityLbInfo(Locality locality) {
this.locality = checkNotNull(locality, "locality");
this.childBalancer = checkNotNull(childBalancer, "childBalancer");
this.childHelper = checkNotNull(childHelper, "childHelper");
loadStatsStore.addLocality(locality);
childHelper = new ChildHelper();
childBalancer = loadBalancerProvider.newLoadBalancer(childHelper);
}
void refreshEndpoints(LocalityLbEndpoints localityLbEndpoints) {
final List<EquivalentAddressGroup> eags = new ArrayList<>();
for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) {
if (endpoint.isHealthy()) {
eags.add(endpoint.getAddress());
}
}
// In extreme case handleResolvedAddresses() may trigger updateBalancingState()
// immediately, so execute handleResolvedAddresses() after all the setup in the caller is
// complete.
childHelper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
if (eags.isEmpty() && !childBalancer.canHandleEmptyAddressListFromNameResolution()) {
childBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription(
"Locality " + locality + " has no healthy endpoint"));
} else {
childBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(eags).build());
}
}
});
}
void shutdown() {
@ -362,6 +347,7 @@ interface LocalityStore {
delayedDeletionTimer = null;
}
childBalancer.shutdown();
loadStatsStore.removeLocality(locality);
logger.log(XdsLogLevel.INFO, "Shut down child balancer for locality {0}", locality);
}
@ -375,20 +361,15 @@ interface LocalityStore {
boolean isDeactivated() {
return delayedDeletionTimer != null;
}
}
class ChildHelper extends ForwardingLoadBalancerHelper {
private final OrcaReportingHelperWrapper orcaReportingHelperWrapper;
private SubchannelPicker currentChildPicker = XdsSubchannelPickers.BUFFER_PICKER;
private ConnectivityState currentChildState = CONNECTING;
ChildHelper(final Locality locality, final ClientLoadCounter counter,
OrcaOobUtil orcaOobUtil) {
checkNotNull(locality, "locality");
checkNotNull(counter, "counter");
checkNotNull(orcaOobUtil, "orcaOobUtil");
ChildHelper() {
final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(locality);
Helper delegate = new ForwardingLoadBalancerHelper() {
@Override
protected Helper delegate() {
@ -396,38 +377,31 @@ interface LocalityStore {
}
@Override
public void updateBalancingState(ConnectivityState newState,
final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
public void updateBalancingState(
ConnectivityState newState, SubchannelPicker newPicker) {
logger.log(
XdsLogLevel.INFO,
"Update load balancing state for locality {0} to {1}", locality, newState);
currentChildState = newState;
currentChildPicker =
new LoadRecordingSubchannelPicker(counter,
new LoadRecordingSubchannelPicker(
counter,
new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter),
newPicker, orcaPerRequestUtil));
// delegate to parent helper
priorityManager.updatePriorityState(priorityManager.getPriority(locality));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("locality", locality).toString();
}
@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
return locality.getSubZone();
}
};
orcaReportingHelperWrapper =
checkNotNull(orcaOobUtil, "orcaOobUtil")
.newOrcaReportingHelperWrapper(delegate, new MetricsRecordingListener(counter));
orcaReportingHelperWrapper =
orcaOobUtil.newOrcaReportingHelperWrapper(
delegate, new MetricsRecordingListener(counter));
if (metricsReportIntervalNano > 0) {
updateMetricsReportInterval(metricsReportIntervalNano);
}
@ -444,6 +418,7 @@ interface LocalityStore {
return orcaReportingHelperWrapper.asHelper();
}
}
}
private final class PriorityManager {
@ -594,47 +569,10 @@ interface LocalityStore {
private void initLocality(Locality locality) {
logger.log(XdsLogLevel.INFO, "Create child balancer for locality {0}", locality);
ChildHelper childHelper =
new ChildHelper(locality, loadStatsStore.getLocalityCounter(locality),
orcaOobUtil);
LocalityLbInfo localityLbInfo =
new LocalityLbInfo(
locality,
loadBalancerProvider.newLoadBalancer(childHelper),
childHelper);
LocalityLbInfo localityLbInfo = new LocalityLbInfo(locality);
localityMap.put(locality, localityLbInfo);
LocalityLbEndpoints localityLbEndpoints = localityInfoMap.get(locality);
handleEagsOnChildBalancer(childHelper, localityLbInfo, localityLbEndpoints, locality);
localityLbInfo.refreshEndpoints(localityInfoMap.get(locality));
}
}
private static void handleEagsOnChildBalancer(
Helper childHelper, final LocalityLbInfo localityLbInfo,
final LocalityLbEndpoints localityLbEndpoints, final Locality locality) {
final List<EquivalentAddressGroup> eags = new ArrayList<>();
for (LbEndpoint endpoint : localityLbEndpoints.getEndpoints()) {
if (endpoint.isHealthy()) {
eags.add(endpoint.getAddress());
}
}
// In extreme case handleResolvedAddresses() may trigger updateBalancingState()
// immediately, so execute handleResolvedAddresses() after all the setup in the caller is
// complete.
childHelper.getSynchronizationContext().execute(new Runnable() {
@Override
public void run() {
if (eags.isEmpty()
&& !localityLbInfo.childBalancer.canHandleEmptyAddressListFromNameResolution()) {
localityLbInfo.childBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription(
"Locality " + locality + " has no healthy endpoint"));
} else {
localityLbInfo.childBalancer
.handleResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(eags).build());
}
}
});
}
}
}

View File

@ -235,39 +235,6 @@ public class LocalityStoreTest {
orcaPerRequestUtil, orcaOobUtil);
}
@Test
@SuppressWarnings("unchecked")
public void updateLocalityStore_updateStatsStoreLocalityTracking() {
Map<Locality, LocalityLbEndpoints> localityInfoMap = new HashMap<>();
localityInfoMap
.put(locality1,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint11, lbEndpoint12), 1, 0));
localityInfoMap
.put(locality2,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).addLocality(locality1);
verify(loadStatsStore).addLocality(locality2);
localityInfoMap
.put(locality3,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).addLocality(locality3);
localityInfoMap = ImmutableMap
.of(locality4,
new LocalityLbEndpoints(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4, 0));
localityStore.updateLocalityStore(ImmutableMap.copyOf(localityInfoMap));
verify(loadStatsStore).removeLocality(locality1);
verify(loadStatsStore).removeLocality(locality2);
verify(loadStatsStore).removeLocality(locality3);
verify(loadStatsStore).addLocality(locality4);
localityStore.updateLocalityStore(ImmutableMap.copyOf(Collections.EMPTY_MAP));
verify(loadStatsStore).removeLocality(locality4);
}
@Test
public void updateLocalityStore_pickResultInterceptedForLoadRecordingWhenSubchannelReady() {
// Simulate receiving two localities.