mirror of https://github.com/grpc/grpc-java.git
xds: Distinct LoadStatManagers (#10009)
Currently the code maintains one LoadStatsManager2 that collects all stats. The problem with this is that in a federation situation there will be multiple LrsClients that will be periodically picking up stats from the manager and sending them to their respective control planes. This creates a first-come-first-serve situation where the stats get randomly distributed across the control planes. This change creates separate LoadStatsManagers dedicated to their own control planes, thus assuring no stats will get lost.
This commit is contained in:
parent
ec9b8e0d61
commit
6d75fca23f
|
|
@ -61,7 +61,8 @@ final class LoadReportClient {
|
||||||
private final ScheduledExecutorService timerService;
|
private final ScheduledExecutorService timerService;
|
||||||
private final Stopwatch retryStopwatch;
|
private final Stopwatch retryStopwatch;
|
||||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||||
private final LoadStatsManager2 loadStatsManager;
|
@VisibleForTesting
|
||||||
|
final LoadStatsManager2 loadStatsManager;
|
||||||
|
|
||||||
private boolean started;
|
private boolean started;
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,7 @@ final class XdsClientImpl extends XdsClient
|
||||||
Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
|
Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
|
||||||
resourceSubscribers = new HashMap<>();
|
resourceSubscribers = new HashMap<>();
|
||||||
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
|
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
|
||||||
private final LoadStatsManager2 loadStatsManager;
|
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
|
||||||
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
|
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
|
||||||
private final XdsChannelFactory xdsChannelFactory;
|
private final XdsChannelFactory xdsChannelFactory;
|
||||||
private final Bootstrapper.BootstrapInfo bootstrapInfo;
|
private final Bootstrapper.BootstrapInfo bootstrapInfo;
|
||||||
|
|
@ -125,7 +125,6 @@ final class XdsClientImpl extends XdsClient
|
||||||
this.bootstrapInfo = bootstrapInfo;
|
this.bootstrapInfo = bootstrapInfo;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.timeService = timeService;
|
this.timeService = timeService;
|
||||||
loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
|
|
||||||
this.backoffPolicyProvider = backoffPolicyProvider;
|
this.backoffPolicyProvider = backoffPolicyProvider;
|
||||||
this.stopwatchSupplier = stopwatchSupplier;
|
this.stopwatchSupplier = stopwatchSupplier;
|
||||||
this.timeProvider = timeProvider;
|
this.timeProvider = timeProvider;
|
||||||
|
|
@ -155,6 +154,8 @@ final class XdsClientImpl extends XdsClient
|
||||||
backoffPolicyProvider,
|
backoffPolicyProvider,
|
||||||
stopwatchSupplier,
|
stopwatchSupplier,
|
||||||
this);
|
this);
|
||||||
|
LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
|
||||||
|
loadStatsManagerMap.put(serverInfo, loadStatsManager);
|
||||||
LoadReportClient lrsClient = new LoadReportClient(
|
LoadReportClient lrsClient = new LoadReportClient(
|
||||||
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
|
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
|
||||||
timeService, backoffPolicyProvider, stopwatchSupplier);
|
timeService, backoffPolicyProvider, stopwatchSupplier);
|
||||||
|
|
@ -342,6 +343,7 @@ final class XdsClientImpl extends XdsClient
|
||||||
@Override
|
@Override
|
||||||
ClusterDropStats addClusterDropStats(
|
ClusterDropStats addClusterDropStats(
|
||||||
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
|
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
|
||||||
|
LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
|
||||||
ClusterDropStats dropCounter =
|
ClusterDropStats dropCounter =
|
||||||
loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
|
loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
|
|
@ -357,6 +359,7 @@ final class XdsClientImpl extends XdsClient
|
||||||
ClusterLocalityStats addClusterLocalityStats(
|
ClusterLocalityStats addClusterLocalityStats(
|
||||||
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
|
final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
|
||||||
Locality locality) {
|
Locality locality) {
|
||||||
|
LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
|
||||||
ClusterLocalityStats loadCounter =
|
ClusterLocalityStats loadCounter =
|
||||||
loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
|
loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
|
||||||
syncContext.execute(new Runnable() {
|
syncContext.execute(new Runnable() {
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ import io.grpc.xds.Filter.NamedFilterConfig;
|
||||||
import io.grpc.xds.XdsClient.ResourceWatcher;
|
import io.grpc.xds.XdsClient.ResourceWatcher;
|
||||||
import io.grpc.xds.XdsListenerResource.LdsUpdate;
|
import io.grpc.xds.XdsListenerResource.LdsUpdate;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
@ -174,6 +175,34 @@ public class XdsClientFederationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assures that {@link LoadReportClient}s have distinct {@link LoadStatsManager2}s so that they
|
||||||
|
* only report on the traffic for their own control plane.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void lrsClientsHaveDistinctLoadStatsManagers() throws InterruptedException {
|
||||||
|
trafficdirector.setLdsConfig(ControlPlaneRule.buildServerListener(),
|
||||||
|
ControlPlaneRule.buildClientListener("test-server"));
|
||||||
|
directpathPa.setLdsConfig(ControlPlaneRule.buildServerListener(),
|
||||||
|
ControlPlaneRule.buildClientListener(
|
||||||
|
"xdstp://server-one/envoy.config.listener.v3.Listener/test-server"));
|
||||||
|
|
||||||
|
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), "test-server", mockWatcher);
|
||||||
|
xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
|
||||||
|
"xdstp://server-one/envoy.config.listener.v3.Listener/test-server", mockDirectPathWatcher);
|
||||||
|
|
||||||
|
// With two control planes and a watcher for each, there should be two LRS clients.
|
||||||
|
assertThat(xdsClient.getServerLrsClientMap().size()).isEqualTo(2);
|
||||||
|
|
||||||
|
// Collect the LoadStatManagers and make sure they are distinct for each control plane.
|
||||||
|
HashSet<LoadStatsManager2> loadStatManagers = new HashSet<>();
|
||||||
|
for (Entry<ServerInfo, LoadReportClient> entry : xdsClient.getServerLrsClientMap().entrySet()) {
|
||||||
|
xdsClient.addClusterDropStats(entry.getKey(), "clusterName", "edsServiceName");
|
||||||
|
loadStatManagers.add(entry.getValue().loadStatsManager);
|
||||||
|
}
|
||||||
|
assertThat(loadStatManagers).containsNoDuplicates();
|
||||||
|
}
|
||||||
|
|
||||||
private Map<String, ?> defaultBootstrapOverride() {
|
private Map<String, ?> defaultBootstrapOverride() {
|
||||||
return ImmutableMap.of(
|
return ImmutableMap.of(
|
||||||
"node", ImmutableMap.of(
|
"node", ImmutableMap.of(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue