mirror of https://github.com/grpc/grpc-java.git
xds: simplify XdsClient APIs to start load reporting automatically when the first stats is added (#7523)
Eliminate reportClientStats/cancelClientStatsReport APIs. The first call of addClientStats will start load reporting.
This commit is contained in:
parent
5ee264da90
commit
0b6f29371b
|
|
@ -162,7 +162,6 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
||||||
private ChildLbState(Helper helper) {
|
private ChildLbState(Helper helper) {
|
||||||
if (lrsServerName != null) {
|
if (lrsServerName != null) {
|
||||||
loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName);
|
loadStatsStore = xdsClient.addClientStats(cluster, edsServiceName);
|
||||||
xdsClient.reportClientStats();
|
|
||||||
} else {
|
} else {
|
||||||
loadStatsStore = null;
|
loadStatsStore = null;
|
||||||
}
|
}
|
||||||
|
|
@ -218,7 +217,6 @@ final class EdsLoadBalancer2 extends LoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
if (lrsServerName != null) {
|
if (lrsServerName != null) {
|
||||||
xdsClient.cancelClientStatsReport();
|
|
||||||
xdsClient.removeClientStats(cluster, edsServiceName);
|
xdsClient.removeClientStats(cluster, edsServiceName);
|
||||||
}
|
}
|
||||||
xdsClient.cancelEdsResourceWatch(resourceName, this);
|
xdsClient.cancelEdsResourceWatch(resourceName, this);
|
||||||
|
|
|
||||||
|
|
@ -593,24 +593,10 @@ abstract class XdsClient {
|
||||||
void watchListenerData(int port, ListenerWatcher watcher) {
|
void watchListenerData(int port, ListenerWatcher watcher) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts client side load reporting via LRS. All clusters report load through one LRS stream,
|
|
||||||
* only the first call of this method effectively starts the LRS stream.
|
|
||||||
*/
|
|
||||||
void reportClientStats() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stops client side load reporting via LRS. All clusters report load through one LRS stream,
|
|
||||||
* only the last call of this method effectively stops the LRS stream.
|
|
||||||
*/
|
|
||||||
void cancelClientStatsReport() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts recording client load stats for the given cluster:cluster_service. Caller should use
|
* Starts recording client load stats for the given cluster:cluster_service. Caller should use
|
||||||
* the returned {@link LoadStatsStore} to record and aggregate stats for load sent to the given
|
* the returned {@link LoadStatsStore} to record and aggregate stats for load sent to the given
|
||||||
* cluster:cluster_service. Recorded stats may be reported to a load reporting server if enabled.
|
* cluster:cluster_service. The first call of this method starts load reporting via LRS.
|
||||||
*/
|
*/
|
||||||
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
|
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
@ -619,6 +605,7 @@ abstract class XdsClient {
|
||||||
/**
|
/**
|
||||||
* Stops recording client load stats for the given cluster:cluster_service. The load reporting
|
* Stops recording client load stats for the given cluster:cluster_service. The load reporting
|
||||||
* server will no longer receive stats for the given cluster:cluster_service after this call.
|
* server will no longer receive stats for the given cluster:cluster_service after this call.
|
||||||
|
* Load reporting may be terminated if there is no stats to be reported.
|
||||||
*/
|
*/
|
||||||
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
|
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,6 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
private final SynchronizationContext syncContext;
|
private final SynchronizationContext syncContext;
|
||||||
private final ScheduledExecutorService timeService;
|
private final ScheduledExecutorService timeService;
|
||||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||||
private final Supplier<Stopwatch> stopwatchSupplier;
|
|
||||||
private final Stopwatch adsStreamRetryStopwatch;
|
private final Stopwatch adsStreamRetryStopwatch;
|
||||||
// The node identifier to be included in xDS requests. Management server only requires the
|
// The node identifier to be included in xDS requests. Management server only requires the
|
||||||
// first request to carry the node identifier on a stream. It should be identical if present
|
// first request to carry the node identifier on a stream. It should be identical if present
|
||||||
|
|
@ -122,6 +121,7 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
private final Map<String, ResourceSubscriber> edsResourceSubscribers = new HashMap<>();
|
private final Map<String, ResourceSubscriber> edsResourceSubscribers = new HashMap<>();
|
||||||
|
|
||||||
private final LoadStatsManager loadStatsManager = new LoadStatsManager();
|
private final LoadStatsManager loadStatsManager = new LoadStatsManager();
|
||||||
|
private final LoadReportClient lrsClient;
|
||||||
|
|
||||||
// Last successfully applied version_info for each resource type. Starts with empty string.
|
// Last successfully applied version_info for each resource type. Starts with empty string.
|
||||||
// A version_info is used to update management server with client's most recent knowledge of
|
// A version_info is used to update management server with client's most recent knowledge of
|
||||||
|
|
@ -137,9 +137,7 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
private BackoffPolicy retryBackoffPolicy;
|
private BackoffPolicy retryBackoffPolicy;
|
||||||
@Nullable
|
@Nullable
|
||||||
private ScheduledHandle rpcRetryTimer;
|
private ScheduledHandle rpcRetryTimer;
|
||||||
@Nullable
|
private boolean reportingLoad;
|
||||||
private LoadReportClient lrsClient;
|
|
||||||
private int loadReportCount; // number of clusters enabling load reporting
|
|
||||||
|
|
||||||
// For server side usage.
|
// For server side usage.
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -160,8 +158,9 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||||
this.timeService = checkNotNull(timeService, "timeService");
|
this.timeService = checkNotNull(timeService, "timeService");
|
||||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatch");
|
adsStreamRetryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
|
||||||
adsStreamRetryStopwatch = stopwatchSupplier.get();
|
lrsClient = new LoadReportClient(loadStatsManager, xdsChannel, node, syncContext, timeService,
|
||||||
|
backoffPolicyProvider, stopwatchSupplier);
|
||||||
logId = InternalLogId.allocate("xds-client", null);
|
logId = InternalLogId.allocate("xds-client", null);
|
||||||
logger = XdsLogger.withLogId(logId);
|
logger = XdsLogger.withLogId(logId);
|
||||||
logger.log(XdsLogLevel.INFO, "Created");
|
logger.log(XdsLogLevel.INFO, "Created");
|
||||||
|
|
@ -175,9 +174,8 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
|
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
|
||||||
}
|
}
|
||||||
cleanUpResourceTimers();
|
cleanUpResourceTimers();
|
||||||
if (lrsClient != null) {
|
if (reportingLoad) {
|
||||||
lrsClient.stopLoadReporting();
|
lrsClient.stopLoadReporting();
|
||||||
lrsClient = null;
|
|
||||||
}
|
}
|
||||||
if (rpcRetryTimer != null) {
|
if (rpcRetryTimer != null) {
|
||||||
rpcRetryTimer.cancel();
|
rpcRetryTimer.cancel();
|
||||||
|
|
@ -336,45 +334,20 @@ final class XdsClientImpl2 extends XdsClient {
|
||||||
node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build();
|
node.toBuilder().setMetadata(newMetadata).addListeningAddresses(listeningAddress).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
void reportClientStats() {
|
|
||||||
if (lrsClient == null) {
|
|
||||||
logger.log(XdsLogLevel.INFO, "Turning on load reporting");
|
|
||||||
lrsClient =
|
|
||||||
new LoadReportClient(
|
|
||||||
loadStatsManager,
|
|
||||||
xdsChannel,
|
|
||||||
node,
|
|
||||||
syncContext,
|
|
||||||
timeService,
|
|
||||||
backoffPolicyProvider,
|
|
||||||
stopwatchSupplier);
|
|
||||||
}
|
|
||||||
if (loadReportCount == 0) {
|
|
||||||
lrsClient.startLoadReporting();
|
|
||||||
}
|
|
||||||
loadReportCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void cancelClientStatsReport() {
|
|
||||||
checkState(loadReportCount > 0, "load reporting was never started");
|
|
||||||
loadReportCount--;
|
|
||||||
if (loadReportCount == 0) {
|
|
||||||
logger.log(XdsLogLevel.INFO, "Turning off load reporting");
|
|
||||||
lrsClient.stopLoadReporting();
|
|
||||||
lrsClient = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
|
LoadStatsStore addClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||||
return loadStatsManager.addLoadStats(clusterName, clusterServiceName);
|
LoadStatsStore loadStatsStore = loadStatsManager.addLoadStats(clusterName, clusterServiceName);
|
||||||
|
if (!reportingLoad) {
|
||||||
|
lrsClient.startLoadReporting();
|
||||||
|
reportingLoad = true;
|
||||||
|
}
|
||||||
|
return loadStatsStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
|
void removeClientStats(String clusterName, @Nullable String clusterServiceName) {
|
||||||
loadStatsManager.removeLoadStats(clusterName, clusterServiceName);
|
loadStatsManager.removeLoadStats(clusterName, clusterServiceName);
|
||||||
|
// TODO(chengyuanzhang): stop load reporting if containing no stats.
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -1509,7 +1509,6 @@ public class XdsClientImplTest2 {
|
||||||
String clusterName = "cluster-foo.googleapis.com";
|
String clusterName = "cluster-foo.googleapis.com";
|
||||||
xdsClient.addClientStats(clusterName, null);
|
xdsClient.addClientStats(clusterName, null);
|
||||||
ArgumentCaptor<LoadStatsRequest> requestCaptor = ArgumentCaptor.forClass(null);
|
ArgumentCaptor<LoadStatsRequest> requestCaptor = ArgumentCaptor.forClass(null);
|
||||||
xdsClient.reportClientStats();
|
|
||||||
RpcCall<LoadStatsRequest, LoadStatsResponse> lrsCall = loadReportCalls.poll();
|
RpcCall<LoadStatsRequest, LoadStatsResponse> lrsCall = loadReportCalls.poll();
|
||||||
verify(lrsCall.requestObserver).onNext(requestCaptor.capture());
|
verify(lrsCall.requestObserver).onNext(requestCaptor.capture());
|
||||||
assertThat(requestCaptor.getValue().getClusterStatsCount())
|
assertThat(requestCaptor.getValue().getClusterStatsCount())
|
||||||
|
|
@ -1531,8 +1530,6 @@ public class XdsClientImplTest2 {
|
||||||
assertThat(requestCaptor.getValue().getClusterStatsCount())
|
assertThat(requestCaptor.getValue().getClusterStatsCount())
|
||||||
.isEqualTo(0); // no more stats reported
|
.isEqualTo(0); // no more stats reported
|
||||||
|
|
||||||
xdsClient.cancelClientStatsReport();
|
|
||||||
assertThat(lrsEnded.get()).isTrue();
|
|
||||||
// See more test on LoadReportClientTest.java
|
// See more test on LoadReportClientTest.java
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue