diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java index 4ba768030a..549e46031d 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer2.java @@ -162,7 +162,6 @@ final class EdsLoadBalancer2 extends LoadBalancer { private ChildLbState(Helper helper) { if (lrsServerName != null) { loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName); - xdsClient.reportClientStats(); } else { loadStatsStore = null; } @@ -218,7 +217,6 @@ final class EdsLoadBalancer2 extends LoadBalancer { @Override public void shutdown() { if (lrsServerName != null) { - xdsClient.cancelClientStatsReport(); xdsClient.removeClientStats(cluster, edsServiceName); } xdsClient.cancelEdsResourceWatch(resourceName, this); diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 7f08816fb2..80d6a730a1 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -593,24 +593,10 @@ abstract class XdsClient { void watchListenerData(int port, ListenerWatcher watcher) { } - /** - * Starts client side load reporting via LRS. All clusters report load through one LRS stream, - * only the first call of this method effectively starts the LRS stream. - */ - void reportClientStats() { - } - - /** - * Stops client side load reporting via LRS. All clusters report load through one LRS stream, - * only the last call of this method effectively stops the LRS stream. - */ - void cancelClientStatsReport() { - } - /** * Starts recording client load stats for the given cluster:cluster_service. Caller should use * the returned {@link LoadStatsStore} to record and aggregate stats for load sent to the given - * cluster:cluster_service. Recorded stats may be reported to a load reporting server if enabled. + * cluster:cluster_service. The first call of this method starts load reporting via LRS. */ LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) { throw new UnsupportedOperationException(); @@ -619,6 +605,7 @@ abstract class XdsClient { /** * Stops recording client load stats for the given cluster:cluster_service. The load reporting * server will no longer receive stats for the given cluster:cluster_service after this call. + * Load reporting may be terminated if there is no stats to be reported. */ void removeClientStats(String clusterName, @Nullable String clusterServiceName) { throw new UnsupportedOperationException(); diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java index 61e5fdc2b8..aef999fed3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl2.java @@ -109,7 +109,6 @@ final class XdsClientImpl2 extends XdsClient { private final SynchronizationContext syncContext; private final ScheduledExecutorService timeService; private final BackoffPolicy.Provider backoffPolicyProvider; - private final Supplier stopwatchSupplier; private final Stopwatch adsStreamRetryStopwatch; // The node identifier to be included in xDS requests. Management server only requires the // first request to carry the node identifier on a stream. It should be identical if present @@ -122,6 +121,7 @@ final class XdsClientImpl2 extends XdsClient { private final Map edsResourceSubscribers = new HashMap<>(); private final LoadStatsManager loadStatsManager = new LoadStatsManager(); + private final LoadReportClient lrsClient; // Last successfully applied version_info for each resource type. Starts with empty string. // A version_info is used to update management server with client's most recent knowledge of @@ -137,9 +137,7 @@ final class XdsClientImpl2 extends XdsClient { private BackoffPolicy retryBackoffPolicy; @Nullable private ScheduledHandle rpcRetryTimer; - @Nullable - private LoadReportClient lrsClient; - private int loadReportCount; // number of clusters enabling load reporting + private boolean reportingLoad; // For server side usage. @Nullable @@ -160,8 +158,9 @@ final class XdsClientImpl2 extends XdsClient { this.syncContext = checkNotNull(syncContext, "syncContext"); this.timeService = checkNotNull(timeService, "timeService"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatch"); - adsStreamRetryStopwatch = stopwatchSupplier.get(); + adsStreamRetryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get(); + lrsClient = new LoadReportClient(loadStatsManager, xdsChannel, node, syncContext, timeService, + backoffPolicyProvider, stopwatchSupplier); logId = InternalLogId.allocate("xds-client", null); logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created"); @@ -175,9 +174,8 @@ final class XdsClientImpl2 extends XdsClient { adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); } cleanUpResourceTimers(); - if (lrsClient != null) { + if (reportingLoad) { lrsClient.stopLoadReporting(); - lrsClient = null; } if (rpcRetryTimer != null) { rpcRetryTimer.cancel(); @@ -336,45 +334,20 @@ final class XdsClientImpl2 extends XdsClient { node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build(); } - @Override - void reportClientStats() { - if (lrsClient == null) { - logger.log(XdsLogLevel.INFO, "Turning on load reporting"); - lrsClient = - new LoadReportClient( - loadStatsManager, - xdsChannel, - node, - syncContext, - timeService, - backoffPolicyProvider, - stopwatchSupplier); - } - if (loadReportCount == 0) { - lrsClient.startLoadReporting(); - } - loadReportCount++; - } - - @Override - void cancelClientStatsReport() { - checkState(loadReportCount > 0, "load reporting was never started"); - loadReportCount--; - if (loadReportCount == 0) { - logger.log(XdsLogLevel.INFO, "Turning off load reporting"); - lrsClient.stopLoadReporting(); - lrsClient = null; - } - } - @Override LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) { - return loadStatsManager.addLoadStats(clusterName, clusterServiceName); + LoadStatsStore loadStatsStore = loadStatsManager.addLoadStats(clusterName, clusterServiceName); + if (!reportingLoad) { + lrsClient.startLoadReporting(); + reportingLoad = true; + } + return loadStatsStore; } @Override void removeClientStats(String clusterName, @Nullable String clusterServiceName) { loadStatsManager.removeLoadStats(clusterName, clusterServiceName); + // TODO(chengyuanzhang): stop load reporting if containing no stats. } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java index 8cebecb57e..f4ce3a321a 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest2.java @@ -1509,7 +1509,6 @@ public class XdsClientImplTest2 { String clusterName = "cluster-foo.googleapis.com"; xdsClient.addClientStats(clusterName, null); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(null); - xdsClient.reportClientStats(); RpcCall lrsCall = loadReportCalls.poll(); verify(lrsCall.requestObserver).onNext(requestCaptor.capture()); assertThat(requestCaptor.getValue().getClusterStatsCount()) @@ -1531,8 +1530,6 @@ public class XdsClientImplTest2 { assertThat(requestCaptor.getValue().getClusterStatsCount()) .isEqualTo(0); // no more stats reported - xdsClient.cancelClientStatsReport(); - assertThat(lrsEnded.get()).isTrue(); // See more test on LoadReportClientTest.java }