mirror of https://github.com/grpc/grpc-java.git
xds: report load stats for all clusters over a single LRS client (#6706)
Current implementation of client side load reporting is incorrect. Mainly each gRPC channel should have at most one LRS stream based on the current design of using a single management server. In this change: - Each LoadStatsStore instance is associated with clusterName:clusterServiceName. clusterName and clusterServiceName (nullable) is required to construct an LoadStatsStore instance. - The semantics is that an LoadStatsStore is responsible for recording loads sent to that cluster service of the cluster. The queried load report (via LoadStatsStore#generateLoadReport()) will have cluster_name and cluster_service_name (if not null) set. - A LoadReportClient is responsible for reporting loads for all clusters. Add LoadStatsStore to LoadReportClient via LoadReportClient#addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore). This should be done before LoadReportClient#startLoadReporting() is called due to the above open question. - An XdsClient contains a single LoadReportClient instance. Its APIs XdsClient#reportClientStats(clusterName, clusterServiceName, loadStatsStore) calls LoadReportClient#addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore) and then starts it. XdsClient#cancelClientStatsReport(clusterName, clusterServiceName) calls LoadReportClient#removeLoadStatsStore(clusterName, clusterServiceName) and stops it. LoadReportClient#addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore) cannot be called repeatedly as once the load reporting started, we cannot change the cluster to report loads for. However, we are able to do report then cancel then report then cancel and so on. - Refactored EdsLoadBalancer a bit, to accommodate the new APIs of enabling/disabling load reporting. The ClusterEndpointsLoadBalancer instance carries its own LoadStatsStore and controls start/cancel of load reporting. - The interface for LoadReportClient is eliminated. LoadReportClient will completely be a subcomponent of XdsClient. Note: Currently we assume no cluster/eds service switch, which means we will report load for a single cluster/eds service. So we make the restriction to LoadReportClient#addLoadStatsStore() API that it cannot be called after load reporting has already started. This restriction will be removed after the above open question is resolved.
This commit is contained in:
parent
19de229b7f
commit
88c027bac3
|
|
@ -46,7 +46,6 @@ import io.grpc.xds.XdsClient.XdsChannelFactory;
|
|||
import io.grpc.xds.XdsClient.XdsClientFactory;
|
||||
import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig;
|
||||
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
|
@ -63,8 +62,6 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
private final Bootstrapper bootstrapper;
|
||||
private final XdsChannelFactory channelFactory;
|
||||
private final Helper edsLbHelper;
|
||||
// Cache for load stats stores for each service in cluster keyed by cluster service names.
|
||||
private final Map<String, LoadStatsStore> loadStatsStoreMap = new HashMap<>();
|
||||
|
||||
// Most recent XdsConfig.
|
||||
@Nullable
|
||||
|
|
@ -74,9 +71,10 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
@Nullable
|
||||
private XdsClient xdsClient;
|
||||
@Nullable
|
||||
private LoadReportClient loadReportClient;
|
||||
@Nullable
|
||||
private String clusterName;
|
||||
// FIXME(chengyuanzhang): should be one instance per cluster:cluster_service.
|
||||
@Nullable
|
||||
private LoadStatsStore loadStatsStore;
|
||||
|
||||
EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) {
|
||||
this(
|
||||
|
|
@ -187,21 +185,22 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
// TODO(zdapeng): Use the correct cluster name. Currently load reporting will be broken if
|
||||
// edsServiceName is changed because we are using edsServiceName for the cluster name.
|
||||
clusterName = clusterServiceName;
|
||||
loadStatsStore = new LoadStatsStoreImpl(clusterName, null);
|
||||
}
|
||||
|
||||
boolean shouldReportStats = newXdsConfig.lrsServerName != null;
|
||||
if (shouldReportStats && !isReportingStats()) {
|
||||
// Start load reporting. This may be a restarting after previously stopping the load
|
||||
// reporting, so need to re-add all the pre-existing loadStatsStores to the new
|
||||
// loadReportClient.
|
||||
loadReportClient = xdsClient.reportClientStats(clusterName, newXdsConfig.lrsServerName);
|
||||
for (Map.Entry<String, LoadStatsStore> entry : loadStatsStoreMap.entrySet()) {
|
||||
loadReportClient.addLoadStatsStore(entry.getKey(), entry.getValue());
|
||||
// FIXME(chengyuanzhang): should report loads for each cluster:cluster_service.
|
||||
if (xdsConfig == null
|
||||
|| !Objects.equals(newXdsConfig.lrsServerName, xdsConfig.lrsServerName)) {
|
||||
if (newXdsConfig.lrsServerName != null) {
|
||||
if (!newXdsConfig.lrsServerName.equals("")) {
|
||||
throw new AssertionError(
|
||||
"Can only report load to the same management server");
|
||||
}
|
||||
xdsClient.reportClientStats(clusterName, null, loadStatsStore);
|
||||
} else if (xdsConfig != null) {
|
||||
xdsClient.cancelClientStatsReport(clusterName, null);
|
||||
}
|
||||
}
|
||||
if (!shouldReportStats && isReportingStats()) {
|
||||
cancelClientStatsReport();
|
||||
}
|
||||
|
||||
// Note: childPolicy change will be handled in LocalityStore, to be implemented.
|
||||
// If edsServiceName in XdsConfig is changed, do a graceful switch.
|
||||
|
|
@ -232,25 +231,14 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
public void shutdown() {
|
||||
channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down");
|
||||
switchingLoadBalancer.shutdown();
|
||||
if (isReportingStats()) {
|
||||
cancelClientStatsReport();
|
||||
}
|
||||
if (xdsClient != null) {
|
||||
if (xdsConfig != null && xdsConfig.lrsServerName != null) {
|
||||
xdsClient.cancelClientStatsReport(clusterName, null);
|
||||
}
|
||||
xdsClient = xdsClientPool.returnObject(xdsClient);
|
||||
}
|
||||
}
|
||||
|
||||
/** Whether the client stats for the cluster is currently reported to the traffic director. */
|
||||
private boolean isReportingStats() {
|
||||
return loadReportClient != null;
|
||||
}
|
||||
|
||||
/** Stops to report client stats for the cluster. */
|
||||
private void cancelClientStatsReport() {
|
||||
xdsClient.cancelClientStatsReport(clusterName);
|
||||
loadReportClient = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* A load balancer factory that provides a load balancer for a given cluster service.
|
||||
*/
|
||||
|
|
@ -291,11 +279,6 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
ClusterEndpointsBalancer(Helper helper) {
|
||||
this.helper = helper;
|
||||
|
||||
LoadStatsStore loadStatsStore = new LoadStatsStoreImpl();
|
||||
loadStatsStoreMap.put(clusterServiceName, loadStatsStore);
|
||||
if (isReportingStats()) {
|
||||
loadReportClient.addLoadStatsStore(clusterServiceName, loadStatsStore);
|
||||
}
|
||||
localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore);
|
||||
|
||||
endpointWatcher = new EndpointWatcherImpl(localityStore);
|
||||
|
|
@ -320,10 +303,6 @@ final class EdsLoadBalancer extends LoadBalancer {
|
|||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
loadStatsStoreMap.remove(clusterServiceName);
|
||||
if (isReportingStats()) {
|
||||
loadReportClient.removeLoadStatsStore(clusterServiceName);
|
||||
}
|
||||
localityStore.reset();
|
||||
xdsClient.cancelEndpointDataWatch(clusterServiceName, endpointWatcher);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,53 +16,361 @@
|
|||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.api.v2.core.Node;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
/**
|
||||
* A {@link LoadReportClient} is the gRPC client's load reporting agent that establishes
|
||||
* connections to traffic director for reporting load stats from gRPC client's perspective.
|
||||
*
|
||||
* <p>Each {@link LoadReportClient} instance is responsible for reporting loads for a single
|
||||
* <b>cluster</b>.
|
||||
* Client of xDS load reporting service based on LRS protocol, which reports load stats of
|
||||
* gRPC client's perspective to a management server.
|
||||
*/
|
||||
@NotThreadSafe
|
||||
interface LoadReportClient {
|
||||
final class LoadReportClient {
|
||||
private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName());
|
||||
|
||||
private final ManagedChannel channel;
|
||||
private final Node node;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final ScheduledExecutorService timerService;
|
||||
private final Supplier<Stopwatch> stopwatchSupplier;
|
||||
private final Stopwatch retryStopwatch;
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
|
||||
// Sources of load stats data for each cluster.
|
||||
// FIXME(chengyuanzhang): this should be Map<String, Map<String, LoadStatsStore>> as each
|
||||
// ClusterStats is keyed by cluster:cluster_service. Currently, cluster_service is always unset.
|
||||
private final Map<String, LoadStatsStore> loadStatsStoreMap = new HashMap<>();
|
||||
private boolean started;
|
||||
|
||||
@Nullable
|
||||
private BackoffPolicy lrsRpcRetryPolicy;
|
||||
@Nullable
|
||||
private ScheduledHandle lrsRpcRetryTimer;
|
||||
@Nullable
|
||||
private LrsStream lrsStream;
|
||||
@Nullable
|
||||
private LoadReportCallback callback;
|
||||
|
||||
LoadReportClient(
|
||||
ManagedChannel channel,
|
||||
Node node,
|
||||
SynchronizationContext syncContext,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
BackoffPolicy.Provider backoffPolicyProvider,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.channel = checkNotNull(channel, "channel");
|
||||
this.node = checkNotNull(node, "node");
|
||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
|
||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
this.retryStopwatch = stopwatchSupplier.get();
|
||||
started = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Establishes load reporting communication and negotiates with traffic director to report load
|
||||
* stats periodically. Calling this method on an already started {@link LoadReportClient} is
|
||||
* no-op.
|
||||
*
|
||||
* @param callback containing methods to be invoked for passing information received from load
|
||||
* reporting responses to xDS load balancer.
|
||||
*/
|
||||
// TODO(chengyuanzhang): do not expose this method.
|
||||
void startLoadReporting(LoadReportCallback callback);
|
||||
public void startLoadReporting(LoadReportCallback callback) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
this.callback = callback;
|
||||
started = true;
|
||||
startLrsRpc();
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminates load reporting. Calling this method on an already stopped
|
||||
* {@link LoadReportClient} is no-op.
|
||||
*
|
||||
*/
|
||||
// TODO(chengyuanzhang): do not expose this method.
|
||||
void stopLoadReporting();
|
||||
public void stopLoadReporting() {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
if (lrsRpcRetryTimer != null) {
|
||||
lrsRpcRetryTimer.cancel();
|
||||
}
|
||||
if (lrsStream != null) {
|
||||
lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
|
||||
}
|
||||
started = false;
|
||||
// Do not shutdown channel as it is not owned by LrsClient.
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides this LoadReportClient source of load stats data for the given cluster service.
|
||||
* If requested, data from the given {@code loadStatsStore} is periodically queried and
|
||||
* sent to traffic director by this LoadReportClient.
|
||||
* Provides this LoadReportClient source of load stats data for the given
|
||||
* cluster:cluster_service. If requested, data from the given loadStatsStore is
|
||||
* periodically queried and sent to traffic director by this LoadReportClient.
|
||||
*
|
||||
* @param clusterServiceName name of the cluster service.
|
||||
* @param loadStatsStore storage of load stats.
|
||||
* <p>Currently we expect load stats data for all clusters to report loads for are provided
|
||||
* before load reporting starts (so that LRS initial request tells management server clusters
|
||||
* it is reporting loads for). Design TBD for reporting loads for extra clusters after load
|
||||
* reporting has started.
|
||||
*
|
||||
* <p>Note: currently clusterServiceName is always unset.
|
||||
*/
|
||||
void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore);
|
||||
public void addLoadStatsStore(
|
||||
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
|
||||
checkState(
|
||||
!loadStatsStoreMap.containsKey(clusterName),
|
||||
"load stats for cluster " + clusterName + " already exists");
|
||||
// FIXME(chengyuanzhang): relax this restriction after design is fleshed out.
|
||||
checkState(
|
||||
!started,
|
||||
"load stats for all clusters to report loads for should be provided before "
|
||||
+ "load reporting has started");
|
||||
loadStatsStoreMap.put(clusterName, loadStatsStore);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops providing load stats data for the given cluster service.
|
||||
* Stops providing load stats data for the given cluster:cluster_service.
|
||||
*
|
||||
* @param clusterServiceName name of the cluster service.
|
||||
* <p>Note: currently clusterServiceName is always unset.
|
||||
*/
|
||||
void removeLoadStatsStore(String clusterServiceName);
|
||||
public void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) {
|
||||
checkState(
|
||||
loadStatsStoreMap.containsKey(clusterName),
|
||||
"load stats for cluster " + clusterName + " does not exist");
|
||||
loadStatsStoreMap.remove(clusterName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class LoadReportingTask implements Runnable {
|
||||
private final LrsStream stream;
|
||||
|
||||
LoadReportingTask(LrsStream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
stream.sendLoadReport();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
class LrsRpcRetryTask implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
startLrsRpc();
|
||||
}
|
||||
}
|
||||
|
||||
private void startLrsRpc() {
|
||||
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
|
||||
LoadReportingServiceGrpc.LoadReportingServiceStub stub
|
||||
= LoadReportingServiceGrpc.newStub(channel);
|
||||
lrsStream = new LrsStream(stub, stopwatchSupplier.get());
|
||||
retryStopwatch.reset().start();
|
||||
lrsStream.start();
|
||||
}
|
||||
|
||||
private class LrsStream implements StreamObserver<LoadStatsResponse> {
|
||||
|
||||
// Cluster to report loads for asked by management server.
|
||||
final Set<String> clusterNames = new HashSet<>();
|
||||
final LoadReportingServiceGrpc.LoadReportingServiceStub stub;
|
||||
final Stopwatch reportStopwatch;
|
||||
StreamObserver<LoadStatsRequest> lrsRequestWriter;
|
||||
boolean initialResponseReceived;
|
||||
boolean closed;
|
||||
long loadReportIntervalNano = -1;
|
||||
ScheduledHandle loadReportTimer;
|
||||
|
||||
LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) {
|
||||
this.stub = checkNotNull(stub, "stub");
|
||||
reportStopwatch = checkNotNull(stopwatch, "stopwatch");
|
||||
}
|
||||
|
||||
void start() {
|
||||
lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this);
|
||||
reportStopwatch.reset().start();
|
||||
// Tells management server which clusters the client is reporting loads for.
|
||||
List<ClusterStats> clusterStatsList = new ArrayList<>();
|
||||
for (String clusterName : loadStatsStoreMap.keySet()) {
|
||||
clusterStatsList.add(ClusterStats.newBuilder().setClusterName(clusterName).build());
|
||||
}
|
||||
LoadStatsRequest initRequest =
|
||||
LoadStatsRequest.newBuilder()
|
||||
.setNode(node)
|
||||
.addAllClusterStats(clusterStatsList)
|
||||
.build();
|
||||
lrsRequestWriter.onNext(initRequest);
|
||||
logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(final LoadStatsResponse response) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleResponse(response);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable t) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleStreamClosed(Status.fromThrowable(t)
|
||||
.augmentDescription("Stream to XDS management server had an error"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleStreamClosed(
|
||||
Status.UNAVAILABLE.withDescription("Stream to XDS management server was closed"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void sendLoadReport() {
|
||||
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
reportStopwatch.reset().start();
|
||||
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
|
||||
for (String name : clusterNames) {
|
||||
if (loadStatsStoreMap.containsKey(name)) {
|
||||
LoadStatsStore loadStatsStore = loadStatsStoreMap.get(name);
|
||||
ClusterStats report =
|
||||
loadStatsStore.generateLoadReport()
|
||||
.toBuilder()
|
||||
.setLoadReportInterval(Durations.fromNanos(interval))
|
||||
.build();
|
||||
requestBuilder.addClusterStats(report);
|
||||
}
|
||||
}
|
||||
LoadStatsRequest request = requestBuilder.build();
|
||||
lrsRequestWriter.onNext(request);
|
||||
logger.log(Level.FINE, "Sent LoadStatsRequest\n{0}", request);
|
||||
scheduleNextLoadReport();
|
||||
}
|
||||
|
||||
private void scheduleNextLoadReport() {
|
||||
// Cancel pending load report and reschedule with updated load reporting interval.
|
||||
if (loadReportTimer != null && loadReportTimer.isPending()) {
|
||||
loadReportTimer.cancel();
|
||||
loadReportTimer = null;
|
||||
}
|
||||
if (loadReportIntervalNano > 0) {
|
||||
loadReportTimer = syncContext.schedule(
|
||||
new LoadReportingTask(this), loadReportIntervalNano, TimeUnit.NANOSECONDS,
|
||||
timerService);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse(LoadStatsResponse response) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!initialResponseReceived) {
|
||||
logger.log(Level.FINE, "Received LRS initial response: {0}", response);
|
||||
initialResponseReceived = true;
|
||||
} else {
|
||||
logger.log(Level.FINE, "Received an LRS response: {0}", response);
|
||||
}
|
||||
loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval());
|
||||
callback.onReportResponse(loadReportIntervalNano);
|
||||
clusterNames.clear();
|
||||
clusterNames.addAll(response.getClustersList());
|
||||
scheduleNextLoadReport();
|
||||
}
|
||||
|
||||
private void handleStreamClosed(Status status) {
|
||||
checkArgument(!status.isOk(), "unexpected OK status");
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
cleanUp();
|
||||
|
||||
long delayNanos = 0;
|
||||
if (initialResponseReceived || lrsRpcRetryPolicy == null) {
|
||||
// Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
|
||||
// has never been initialized.
|
||||
lrsRpcRetryPolicy = backoffPolicyProvider.get();
|
||||
}
|
||||
// Backoff only when balancer wasn't working previously.
|
||||
if (!initialResponseReceived) {
|
||||
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
|
||||
// actual delay may be smaller than the value from the back-off policy, or even negative,
|
||||
// depending how much time was spent in the previous RPC.
|
||||
delayNanos =
|
||||
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
}
|
||||
logger.log(Level.FINE, "LRS stream closed, backoff in {0} second(s)",
|
||||
TimeUnit.NANOSECONDS.toSeconds(delayNanos <= 0 ? 0 : delayNanos));
|
||||
if (delayNanos <= 0) {
|
||||
startLrsRpc();
|
||||
} else {
|
||||
lrsRpcRetryTimer =
|
||||
syncContext.schedule(new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS,
|
||||
timerService);
|
||||
}
|
||||
}
|
||||
|
||||
private void close(@Nullable Exception error) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
cleanUp();
|
||||
if (error == null) {
|
||||
lrsRequestWriter.onCompleted();
|
||||
} else {
|
||||
lrsRequestWriter.onError(error);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanUp() {
|
||||
if (loadReportTimer != null) {
|
||||
loadReportTimer.cancel();
|
||||
loadReportTimer = null;
|
||||
}
|
||||
if (lrsStream == this) {
|
||||
lrsStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callbacks for passing information received from client load reporting responses to xDS load
|
||||
|
|
|
|||
|
|
@ -1,335 +0,0 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.protobuf.util.Durations;
|
||||
import io.envoyproxy.envoy.api.v2.core.Node;
|
||||
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
|
||||
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
/**
|
||||
* Client of xDS load reporting service based on LRS protocol.
|
||||
*/
|
||||
@NotThreadSafe
|
||||
final class LoadReportClientImpl implements LoadReportClient {
|
||||
|
||||
// TODO(chengyuanzhang): use channel logger once XdsClientImpl migrates to use channel logger.
|
||||
private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName());
|
||||
|
||||
private final String clusterName;
|
||||
private final ManagedChannel channel;
|
||||
private final Node node;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final ScheduledExecutorService timerService;
|
||||
private final Supplier<Stopwatch> stopwatchSupplier;
|
||||
private final Stopwatch retryStopwatch;
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
|
||||
// Sources of load stats data for each service in cluster.
|
||||
private final Map<String, LoadStatsStore> loadStatsStoreMap = new HashMap<>();
|
||||
private boolean started;
|
||||
|
||||
@Nullable
|
||||
private BackoffPolicy lrsRpcRetryPolicy;
|
||||
@Nullable
|
||||
private ScheduledHandle lrsRpcRetryTimer;
|
||||
@Nullable
|
||||
private LrsStream lrsStream;
|
||||
@Nullable
|
||||
private LoadReportCallback callback;
|
||||
|
||||
LoadReportClientImpl(ManagedChannel channel,
|
||||
String clusterName,
|
||||
Node node,
|
||||
SynchronizationContext syncContext,
|
||||
ScheduledExecutorService scheduledExecutorService,
|
||||
BackoffPolicy.Provider backoffPolicyProvider,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.channel = checkNotNull(channel, "channel");
|
||||
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||
this.node = checkNotNull(node, "node");
|
||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
|
||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
this.retryStopwatch = stopwatchSupplier.get();
|
||||
started = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startLoadReporting(LoadReportCallback callback) {
|
||||
if (started) {
|
||||
return;
|
||||
}
|
||||
this.callback = callback;
|
||||
started = true;
|
||||
startLrsRpc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopLoadReporting() {
|
||||
if (!started) {
|
||||
return;
|
||||
}
|
||||
if (lrsRpcRetryTimer != null) {
|
||||
lrsRpcRetryTimer.cancel();
|
||||
}
|
||||
if (lrsStream != null) {
|
||||
lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
|
||||
}
|
||||
started = false;
|
||||
// Do not shutdown channel as it is not owned by LrsClient.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore) {
|
||||
loadStatsStoreMap.put(clusterServiceName, loadStatsStore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeLoadStatsStore(String clusterServiceName) {
|
||||
loadStatsStoreMap.remove(clusterServiceName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static class LoadReportingTask implements Runnable {
|
||||
private final LrsStream stream;
|
||||
|
||||
LoadReportingTask(LrsStream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
stream.sendLoadReport();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
class LrsRpcRetryTask implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
startLrsRpc();
|
||||
}
|
||||
}
|
||||
|
||||
private void startLrsRpc() {
|
||||
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
|
||||
LoadReportingServiceGrpc.LoadReportingServiceStub stub
|
||||
= LoadReportingServiceGrpc.newStub(channel);
|
||||
lrsStream = new LrsStream(stub, stopwatchSupplier.get());
|
||||
retryStopwatch.reset().start();
|
||||
lrsStream.start();
|
||||
}
|
||||
|
||||
private class LrsStream implements StreamObserver<LoadStatsResponse> {
|
||||
|
||||
// Cluster services to report loads for, instructed by LRS responses.
|
||||
final Set<String> clusterServiceNames = new HashSet<>();
|
||||
final LoadReportingServiceGrpc.LoadReportingServiceStub stub;
|
||||
final Stopwatch reportStopwatch;
|
||||
StreamObserver<LoadStatsRequest> lrsRequestWriter;
|
||||
boolean initialResponseReceived;
|
||||
boolean closed;
|
||||
long loadReportIntervalNano = -1;
|
||||
ScheduledHandle loadReportTimer;
|
||||
|
||||
LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) {
|
||||
this.stub = checkNotNull(stub, "stub");
|
||||
reportStopwatch = checkNotNull(stopwatch, "stopwatch");
|
||||
}
|
||||
|
||||
void start() {
|
||||
lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this);
|
||||
reportStopwatch.reset().start();
|
||||
LoadStatsRequest initRequest =
|
||||
LoadStatsRequest.newBuilder()
|
||||
.setNode(node)
|
||||
.addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName))
|
||||
.build();
|
||||
lrsRequestWriter.onNext(initRequest);
|
||||
logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(final LoadStatsResponse response) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleResponse(response);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(final Throwable t) {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleStreamClosed(Status.fromThrowable(t)
|
||||
.augmentDescription("Stream to XDS management server had an error"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
syncContext.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleStreamClosed(
|
||||
Status.UNAVAILABLE.withDescription("Stream to XDS management server was closed"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void sendLoadReport() {
|
||||
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
reportStopwatch.reset().start();
|
||||
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
|
||||
for (String serviceName : clusterServiceNames) {
|
||||
if (loadStatsStoreMap.containsKey(serviceName)) {
|
||||
LoadStatsStore loadStatsStore = loadStatsStoreMap.get(serviceName);
|
||||
ClusterStats report =
|
||||
loadStatsStore.generateLoadReport()
|
||||
.toBuilder()
|
||||
.setClusterName(serviceName)
|
||||
.setLoadReportInterval(Durations.fromNanos(interval))
|
||||
.build();
|
||||
requestBuilder.addClusterStats(report);
|
||||
}
|
||||
}
|
||||
LoadStatsRequest request = requestBuilder.build();
|
||||
lrsRequestWriter.onNext(request);
|
||||
logger.log(Level.FINE, "Sent LoadStatsRequest\n{0}", request);
|
||||
scheduleNextLoadReport();
|
||||
}
|
||||
|
||||
private void scheduleNextLoadReport() {
|
||||
// Cancel pending load report and reschedule with updated load reporting interval.
|
||||
if (loadReportTimer != null && loadReportTimer.isPending()) {
|
||||
loadReportTimer.cancel();
|
||||
loadReportTimer = null;
|
||||
}
|
||||
if (loadReportIntervalNano > 0) {
|
||||
loadReportTimer = syncContext.schedule(
|
||||
new LoadReportingTask(this), loadReportIntervalNano, TimeUnit.NANOSECONDS,
|
||||
timerService);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleResponse(LoadStatsResponse response) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!initialResponseReceived) {
|
||||
logger.log(Level.FINE, "Received LRS initial response: {0}", response);
|
||||
initialResponseReceived = true;
|
||||
} else {
|
||||
logger.log(Level.FINE, "Received an LRS response: {0}", response);
|
||||
}
|
||||
loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval());
|
||||
callback.onReportResponse(loadReportIntervalNano);
|
||||
clusterServiceNames.clear();
|
||||
clusterServiceNames.addAll(response.getClustersList());
|
||||
scheduleNextLoadReport();
|
||||
}
|
||||
|
||||
private void handleStreamClosed(Status status) {
|
||||
checkArgument(!status.isOk(), "unexpected OK status");
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
cleanUp();
|
||||
|
||||
long delayNanos = 0;
|
||||
if (initialResponseReceived || lrsRpcRetryPolicy == null) {
|
||||
// Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
|
||||
// has never been initialized.
|
||||
lrsRpcRetryPolicy = backoffPolicyProvider.get();
|
||||
}
|
||||
// Backoff only when balancer wasn't working previously.
|
||||
if (!initialResponseReceived) {
|
||||
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
|
||||
// actual delay may be smaller than the value from the back-off policy, or even negative,
|
||||
// depending how much time was spent in the previous RPC.
|
||||
delayNanos =
|
||||
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
}
|
||||
logger.log(Level.FINE, "LRS stream closed, backoff in {0} second(s)",
|
||||
TimeUnit.NANOSECONDS.toSeconds(delayNanos <= 0 ? 0 : delayNanos));
|
||||
if (delayNanos <= 0) {
|
||||
startLrsRpc();
|
||||
} else {
|
||||
lrsRpcRetryTimer =
|
||||
syncContext.schedule(new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS,
|
||||
timerService);
|
||||
}
|
||||
}
|
||||
|
||||
private void close(@Nullable Exception error) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
closed = true;
|
||||
cleanUp();
|
||||
if (error == null) {
|
||||
lrsRequestWriter.onCompleted();
|
||||
} else {
|
||||
lrsRequestWriter.onError(error);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanUp() {
|
||||
if (loadReportTimer != null) {
|
||||
loadReportTimer.cancel();
|
||||
loadReportTimer = null;
|
||||
}
|
||||
if (lrsStream == this) {
|
||||
lrsStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -21,14 +21,14 @@ import io.grpc.xds.EnvoyProtoData.Locality;
|
|||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats for
|
||||
* a service cluster (i.e., GSLB service) exposed by traffic director from a gRPC client's
|
||||
* perspective, including dropped calls instructed by traffic director. Load stats for endpoints
|
||||
* (i.e., Google backends) are aggregated in locality granularity (i.e., Google cluster) while the
|
||||
* numbers of dropped calls are aggregated in cluster granularity.
|
||||
* Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats per
|
||||
* cluster:cluster_service exposed by traffic director from a gRPC client's perspective,
|
||||
* including dropped calls. Load stats for endpoints (i.e., Google backends) are aggregated in
|
||||
* locality granularity (i.e., Google cluster) while the numbers of dropped calls are aggregated
|
||||
* in cluster:cluster_service granularity.
|
||||
*
|
||||
* <p>An {@code LoadStatsStore} lives the same span of lifecycle as a cluster and
|
||||
* only tracks loads for localities exposed by remote traffic director. A proper usage should be
|
||||
* <p>An {@code LoadStatsStore} only tracks loads for localities exposed by remote traffic
|
||||
* director. A proper usage should be
|
||||
*
|
||||
* <ol>
|
||||
* <li>Let {@link LoadStatsStore} track the locality newly exposed by traffic director by
|
||||
|
|
@ -41,10 +41,6 @@ import javax.annotation.Nullable;
|
|||
*
|
||||
* <p>No locality information is needed for recording dropped calls since they are aggregated in
|
||||
* cluster granularity.
|
||||
*
|
||||
* <p>Note implementations should only be responsible for keeping track of loads and generating
|
||||
* load reports with load data, any load reporting information should be opaque to {@code
|
||||
* LoadStatsStore} and be set outside.
|
||||
*/
|
||||
interface LoadStatsStore {
|
||||
|
||||
|
|
@ -61,7 +57,7 @@ interface LoadStatsStore {
|
|||
* reporting.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* used by {@link XdsClient}.
|
||||
* used by {@link LoadReportClient}.
|
||||
*/
|
||||
ClusterStats generateLoadReport();
|
||||
|
||||
|
|
@ -70,10 +66,10 @@ interface LoadStatsStore {
|
|||
* endpoints in added localities will be recorded and included in generated load reports.
|
||||
*
|
||||
* <p>This method needs to be called at locality updates only for newly assigned localities in
|
||||
* balancer discovery responses before recording loads for those localities.
|
||||
* endpoint discovery responses before recording loads for those localities.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* used by {@link XdsClient}.
|
||||
* used by {@link LoadReportClient}.
|
||||
*/
|
||||
void addLocality(Locality locality);
|
||||
|
||||
|
|
@ -88,7 +84,7 @@ interface LoadStatsStore {
|
|||
* waste and keep including zero-load upstream locality stats in generated load reports.
|
||||
*
|
||||
* <p>This method is not thread-safe and should be called from the same synchronized context
|
||||
* used by {@link XdsClient}.
|
||||
* used by {@link LoadReportClient}.
|
||||
*/
|
||||
void removeLocality(Locality locality);
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
|
||||
/**
|
||||
|
|
@ -40,19 +41,27 @@ import javax.annotation.concurrent.NotThreadSafe;
|
|||
*/
|
||||
@NotThreadSafe
|
||||
final class LoadStatsStoreImpl implements LoadStatsStore {
|
||||
|
||||
private final String clusterName;
|
||||
@Nullable
|
||||
@SuppressWarnings("unused")
|
||||
private final String clusterServiceName;
|
||||
private final ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters;
|
||||
// Cluster level dropped request counts for each category decision made by xDS load balancer.
|
||||
private final ConcurrentMap<String, AtomicLong> dropCounters;
|
||||
|
||||
LoadStatsStoreImpl() {
|
||||
this(new ConcurrentHashMap<Locality, ClientLoadCounter>(),
|
||||
LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName) {
|
||||
this(clusterName, clusterServiceName, new ConcurrentHashMap<Locality, ClientLoadCounter>(),
|
||||
new ConcurrentHashMap<String, AtomicLong>());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
LoadStatsStoreImpl(ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters,
|
||||
LoadStatsStoreImpl(
|
||||
String clusterName,
|
||||
@Nullable String clusterServiceName,
|
||||
ConcurrentMap<Locality, ClientLoadCounter> localityLoadCounters,
|
||||
ConcurrentMap<String, AtomicLong> dropCounters) {
|
||||
this.clusterName = checkNotNull(clusterName, "clusterName");
|
||||
this.clusterServiceName = clusterServiceName;
|
||||
this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters");
|
||||
this.dropCounters = checkNotNull(dropCounters, "dropCounters");
|
||||
}
|
||||
|
|
@ -60,6 +69,8 @@ final class LoadStatsStoreImpl implements LoadStatsStore {
|
|||
@Override
|
||||
public ClusterStats generateLoadReport() {
|
||||
ClusterStats.Builder statsBuilder = ClusterStats.newBuilder();
|
||||
statsBuilder.setClusterName(clusterName);
|
||||
// TODO(chengyuangzhang): also set cluster_service_name if provided.
|
||||
for (Map.Entry<Locality, ClientLoadCounter> entry : localityLoadCounters.entrySet()) {
|
||||
ClientLoadSnapshot snapshot = entry.getValue().snapshot();
|
||||
UpstreamLocalityStats.Builder localityStatsBuilder =
|
||||
|
|
|
|||
|
|
@ -417,16 +417,21 @@ abstract class XdsClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* Starts reporting client load stats to a remote server for the given cluster.
|
||||
* Report client load stats to a remote server for the given cluster:cluster_service.
|
||||
*
|
||||
* <p>Note: currently we can only report loads for a single cluster:cluster_service,
|
||||
* as the design for adding clusters to report loads for while load reporting is
|
||||
* happening is undefined.
|
||||
*/
|
||||
LoadReportClient reportClientStats(String clusterName, String serverUri) {
|
||||
void reportClientStats(
|
||||
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops reporting client load stats to the remote server for the given cluster.
|
||||
* Stops reporting client load stats to the remote server for the given cluster:cluster_service.
|
||||
*/
|
||||
void cancelClientStatsReport(String clusterName) {
|
||||
void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) {
|
||||
}
|
||||
|
||||
abstract static class XdsClientFactory {
|
||||
|
|
|
|||
|
|
@ -124,9 +124,6 @@ final class XdsClientImpl extends XdsClient {
|
|||
// watchers can watch endpoints in the same cluster.
|
||||
private final Map<String, Set<EndpointWatcher>> endpointWatchers = new HashMap<>();
|
||||
|
||||
// Load reporting clients, with each responsible for reporting loads of a single cluster.
|
||||
private final Map<String, LoadReportClientImpl> lrsClients = new HashMap<>();
|
||||
|
||||
// Resource fetch timers are used to conclude absence of resources. Each timer is activated when
|
||||
// subscription for the resource starts and disarmed on first update for the resource.
|
||||
|
||||
|
|
@ -150,6 +147,8 @@ final class XdsClientImpl extends XdsClient {
|
|||
private BackoffPolicy retryBackoffPolicy;
|
||||
@Nullable
|
||||
private ScheduledHandle rpcRetryTimer;
|
||||
@Nullable
|
||||
private LoadReportClient lrsClient;
|
||||
|
||||
// Following fields are set only after the ConfigWatcher registered. Once set, they should
|
||||
// never change.
|
||||
|
|
@ -189,8 +188,9 @@ final class XdsClientImpl extends XdsClient {
|
|||
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
|
||||
}
|
||||
cleanUpResources();
|
||||
for (LoadReportClientImpl lrsClient : lrsClients.values()) {
|
||||
if (lrsClient != null) {
|
||||
lrsClient.stopLoadReporting();
|
||||
lrsClient = null;
|
||||
}
|
||||
if (rpcRetryTimer != null) {
|
||||
rpcRetryTimer.cancel();
|
||||
|
|
@ -405,37 +405,31 @@ final class XdsClientImpl extends XdsClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
LoadReportClient reportClientStats(String clusterName, String serverUri) {
|
||||
checkNotNull(serverUri, "serverUri");
|
||||
checkArgument(serverUri.equals(""),
|
||||
"Currently only support empty serverUri, which defaults to the same "
|
||||
+ "management server this client talks to.");
|
||||
if (!lrsClients.containsKey(clusterName)) {
|
||||
LoadReportClientImpl lrsClient =
|
||||
new LoadReportClientImpl(
|
||||
channel,
|
||||
clusterName,
|
||||
node,
|
||||
syncContext,
|
||||
timeService,
|
||||
backoffPolicyProvider,
|
||||
stopwatchSupplier);
|
||||
lrsClient.startLoadReporting(
|
||||
new LoadReportCallback() {
|
||||
@Override
|
||||
public void onReportResponse(long reportIntervalNano) {}
|
||||
});
|
||||
lrsClients.put(clusterName, lrsClient);
|
||||
}
|
||||
return lrsClients.get(clusterName);
|
||||
void reportClientStats(
|
||||
String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) {
|
||||
checkState(lrsClient == null,
|
||||
"load reporting has already started, cannot change clusters to report loads for");
|
||||
lrsClient =
|
||||
new LoadReportClient(
|
||||
channel,
|
||||
node,
|
||||
syncContext,
|
||||
timeService,
|
||||
backoffPolicyProvider,
|
||||
stopwatchSupplier);
|
||||
lrsClient.addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore);
|
||||
lrsClient.startLoadReporting(new LoadReportCallback() {
|
||||
@Override
|
||||
public void onReportResponse(long reportIntervalNano) {}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
void cancelClientStatsReport(String clusterName) {
|
||||
LoadReportClientImpl lrsClient = lrsClients.remove(clusterName);
|
||||
if (lrsClient != null) {
|
||||
lrsClient.stopLoadReporting();
|
||||
}
|
||||
void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) {
|
||||
checkState(lrsClient != null, "load reporting was never started");
|
||||
lrsClient.removeLoadStatsStore(clusterName, clusterServiceName);
|
||||
lrsClient.stopLoadReporting();
|
||||
lrsClient = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.util.Durations;
|
||||
|
|
@ -54,6 +53,8 @@ import io.grpc.stub.StreamObserver;
|
|||
import io.grpc.testing.GrpcCleanupRule;
|
||||
import io.grpc.xds.LoadReportClient.LoadReportCallback;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
|
@ -74,19 +75,17 @@ import org.mockito.Mock;
|
|||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link LoadReportClientImpl}.
|
||||
* Unit tests for {@link LoadReportClient}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadReportClientImplTest {
|
||||
|
||||
private static final String CLUSTER_NAME = "foo.blade.googleapis.com";
|
||||
public class LoadReportClientTest {
|
||||
private static final Node NODE = Node.newBuilder().setId("LRS test").build();
|
||||
private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
|
||||
new FakeClock.TaskFilter() {
|
||||
@Override
|
||||
public boolean shouldAccept(Runnable command) {
|
||||
return command.toString()
|
||||
.contains(LoadReportClientImpl.LoadReportingTask.class.getSimpleName());
|
||||
.contains(LoadReportClient.LoadReportingTask.class.getSimpleName());
|
||||
}
|
||||
};
|
||||
private static final FakeClock.TaskFilter LRS_RPC_RETRY_TASK_FILTER =
|
||||
|
|
@ -94,14 +93,9 @@ public class LoadReportClientImplTest {
|
|||
@Override
|
||||
public boolean shouldAccept(Runnable command) {
|
||||
return command.toString()
|
||||
.contains(LoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName());
|
||||
.contains(LoadReportClient.LrsRpcRetryTask.class.getSimpleName());
|
||||
}
|
||||
};
|
||||
private static final LoadStatsRequest EXPECTED_INITIAL_REQ =
|
||||
LoadStatsRequest.newBuilder()
|
||||
.setNode(NODE)
|
||||
.addClusterStats(ClusterStats.newBuilder().setClusterName(CLUSTER_NAME))
|
||||
.build();
|
||||
|
||||
@Rule
|
||||
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
|
||||
|
|
@ -134,7 +128,7 @@ public class LoadReportClientImplTest {
|
|||
|
||||
private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService;
|
||||
private ManagedChannel channel;
|
||||
private LoadReportClientImpl lrsClient;
|
||||
private LoadReportClient lrsClient;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Before
|
||||
|
|
@ -172,14 +166,12 @@ public class LoadReportClientImplTest {
|
|||
when(backoffPolicy2.nextBackoffNanos())
|
||||
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
|
||||
lrsClient =
|
||||
new LoadReportClientImpl(
|
||||
new LoadReportClient(
|
||||
channel,
|
||||
CLUSTER_NAME,
|
||||
NODE, syncContext,
|
||||
fakeClock.getScheduledExecutorService(),
|
||||
backoffPolicyProvider,
|
||||
fakeClock.getStopwatchSupplier());
|
||||
lrsClient.startLoadReporting(callback);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -190,21 +182,29 @@ public class LoadReportClientImplTest {
|
|||
|
||||
@Test
|
||||
public void typicalWorkflow() {
|
||||
String cluster1 = "cluster-foo.googleapis.com";
|
||||
String cluster2 = "cluster-bar.googleapis.com";
|
||||
ClusterStats rawStats1 = generateClusterLoadStats(cluster1);
|
||||
ClusterStats rawStats2 = generateClusterLoadStats(cluster2);
|
||||
when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1);
|
||||
when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2);
|
||||
lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1);
|
||||
lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2);
|
||||
lrsClient.startLoadReporting(callback);
|
||||
|
||||
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
StreamObserver<LoadStatsRequest> requestObserver =
|
||||
Iterables.getOnlyElement(lrsRequestObservers);
|
||||
InOrder inOrder = inOrder(requestObserver);
|
||||
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
|
||||
InOrder inOrder = inOrder(requestObserver, callback);
|
||||
inOrder.verify(requestObserver).onNext(eq(buildInitialRequest(cluster1, cluster2)));
|
||||
|
||||
String service1 = "namespace-foo:service-blade";
|
||||
ClusterStats rawStats1 = generateServiceLoadStats();
|
||||
when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1);
|
||||
lrsClient.addLoadStatsStore(service1, loadStatsStore1);
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1), 1000));
|
||||
// Management server asks to report loads for cluster1.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000));
|
||||
inOrder.verify(callback).onReportResponse(1000);
|
||||
|
||||
ArgumentMatcher<LoadStatsRequest> expectedLoadReportMatcher =
|
||||
new LoadStatsRequestMatcher(ImmutableMap.of(service1, rawStats1), 1000);
|
||||
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 1000);
|
||||
fakeClock.forwardNanos(999);
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
fakeClock.forwardNanos(1);
|
||||
|
|
@ -214,48 +214,55 @@ public class LoadReportClientImplTest {
|
|||
inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher));
|
||||
|
||||
// Management server updates the interval of sending load reports.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1), 2000));
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000));
|
||||
inOrder.verify(callback).onReportResponse(2000);
|
||||
|
||||
fakeClock.forwardNanos(1000);
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
fakeClock.forwardNanos(1000);
|
||||
inOrder.verify(requestObserver)
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(service1, rawStats1), 2000)));
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 2000)));
|
||||
|
||||
String service2 = "namespace-bar:service-baz";
|
||||
ClusterStats rawStats2 = generateServiceLoadStats();
|
||||
when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2);
|
||||
lrsClient.addLoadStatsStore(service2, loadStatsStore2);
|
||||
|
||||
// Management server asks to report loads for an extra cluster service.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1, service2), 2000));
|
||||
// Management server asks to report loads for cluster1 and cluster2.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000));
|
||||
inOrder.verify(callback).onReportResponse(2000);
|
||||
|
||||
fakeClock.forwardNanos(2000);
|
||||
inOrder.verify(requestObserver)
|
||||
.onNext(
|
||||
argThat(
|
||||
new LoadStatsRequestMatcher(
|
||||
ImmutableMap.of(service1, rawStats1, service2, rawStats2), 2000)));
|
||||
new LoadStatsRequestMatcher(ImmutableList.of(rawStats1, rawStats2), 2000)));
|
||||
|
||||
// Load reports for one of existing service is no longer wanted.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(service2), 2000));
|
||||
// Load reports for cluster1 is no longer wanted.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000));
|
||||
inOrder.verify(callback).onReportResponse(2000);
|
||||
|
||||
fakeClock.forwardNanos(2000);
|
||||
inOrder.verify(requestObserver)
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(service2, rawStats2), 2000)));
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats2), 2000)));
|
||||
|
||||
// Management server asks loads for a cluster service that client has no load data.
|
||||
responseObserver.onNext(buildLrsResponse(ImmutableList.of("namespace-ham:service-spam"), 2000));
|
||||
// Management server asks loads for a cluster that client has no load data.
|
||||
responseObserver
|
||||
.onNext(buildLrsResponse(ImmutableList.of("cluster-unknown.googleapis.com"), 2000));
|
||||
inOrder.verify(callback).onReportResponse(2000);
|
||||
|
||||
fakeClock.forwardNanos(2000);
|
||||
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
|
||||
inOrder.verify(requestObserver).onNext(reportCaptor.capture());
|
||||
assertThat(reportCaptor.getValue().getClusterStatsCount()).isEqualTo(0);
|
||||
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lrsStreamClosedAndRetried() {
|
||||
String clusterName = "cluster-foo.googleapis.com";
|
||||
ClusterStats stats = generateClusterLoadStats(clusterName);
|
||||
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
|
||||
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
|
||||
lrsClient.startLoadReporting(callback);
|
||||
|
||||
InOrder inOrder = inOrder(mockLoadReportingService, backoffPolicyProvider, backoffPolicy1,
|
||||
backoffPolicy2);
|
||||
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||
|
|
@ -264,7 +271,7 @@ public class LoadReportClientImplTest {
|
|||
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
|
||||
|
||||
// First balancer RPC
|
||||
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Balancer closes it immediately (erroneously)
|
||||
|
|
@ -284,7 +291,7 @@ public class LoadReportClientImplTest {
|
|||
responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
assertThat(lrsRequestObservers).hasSize(1);
|
||||
requestObserver = lrsRequestObservers.poll();
|
||||
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Balancer closes it with an error.
|
||||
|
|
@ -303,13 +310,12 @@ public class LoadReportClientImplTest {
|
|||
responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
assertThat(lrsRequestObservers).hasSize(1);
|
||||
requestObserver = lrsRequestObservers.poll();
|
||||
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Balancer sends a response asking for loads of some cluster service.
|
||||
String serviceName = "namespace-foo:service-blade";
|
||||
// Balancer sends a response asking for loads of the cluster.
|
||||
responseObserver
|
||||
.onNext(buildLrsResponse(ImmutableList.of(serviceName), 0));
|
||||
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 0));
|
||||
|
||||
// Then breaks the RPC
|
||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||
|
|
@ -320,7 +326,7 @@ public class LoadReportClientImplTest {
|
|||
responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
assertThat(lrsRequestObservers).hasSize(1);
|
||||
requestObserver = lrsRequestObservers.poll();
|
||||
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
|
||||
// Fail the retry after spending 4ns
|
||||
fakeClock.forwardNanos(4);
|
||||
|
|
@ -338,19 +344,16 @@ public class LoadReportClientImplTest {
|
|||
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||
assertThat(lrsRequestObservers).hasSize(1);
|
||||
requestObserver = lrsRequestObservers.poll();
|
||||
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Load reporting back to normal.
|
||||
responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
ClusterStats stats = generateServiceLoadStats();
|
||||
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
|
||||
lrsClient.addLoadStatsStore(serviceName, loadStatsStore1);
|
||||
responseObserver
|
||||
.onNext(buildLrsResponse(ImmutableList.of(serviceName), 10));
|
||||
.onNext(buildLrsResponse(ImmutableList.of(clusterName), 10));
|
||||
fakeClock.forwardNanos(10);
|
||||
verify(requestObserver)
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(serviceName, stats), 10)));
|
||||
.onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(stats), 10)));
|
||||
|
||||
// Wrapping up
|
||||
verify(backoffPolicyProvider, times(2)).get();
|
||||
|
|
@ -360,13 +363,19 @@ public class LoadReportClientImplTest {
|
|||
|
||||
@Test
|
||||
public void raceBetweenLoadReportingAndLbStreamClosure() {
|
||||
String clusterName = "cluster-foo.googleapis.com";
|
||||
ClusterStats stats = generateClusterLoadStats(clusterName);
|
||||
when(loadStatsStore1.generateLoadReport()).thenReturn(stats);
|
||||
lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1);
|
||||
lrsClient.startLoadReporting(callback);
|
||||
|
||||
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
|
||||
assertThat(lrsRequestObservers).hasSize(1);
|
||||
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
|
||||
|
||||
// First balancer RPC
|
||||
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
|
||||
verify(requestObserver).onNext(eq(buildInitialRequest(clusterName)));
|
||||
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||
|
||||
// Simulate receiving a response from traffic director.
|
||||
|
|
@ -394,19 +403,28 @@ public class LoadReportClientImplTest {
|
|||
}
|
||||
|
||||
private static LoadStatsResponse buildLrsResponse(
|
||||
List<String> clusterServiceNames, long loadReportIntervalNanos) {
|
||||
List<String> clusterNames, long loadReportIntervalNanos) {
|
||||
return
|
||||
LoadStatsResponse
|
||||
.newBuilder()
|
||||
.addAllClusters(clusterServiceNames)
|
||||
.addAllClusters(clusterNames)
|
||||
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos))
|
||||
.build();
|
||||
}
|
||||
|
||||
private static LoadStatsRequest buildInitialRequest(String... clusters) {
|
||||
List<ClusterStats> clusterStatsList = new ArrayList<>();
|
||||
for (String cluster : clusters) {
|
||||
clusterStatsList.add(ClusterStats.newBuilder().setClusterName(cluster).build());
|
||||
}
|
||||
return
|
||||
LoadStatsRequest.newBuilder().setNode(NODE).addAllClusterStats(clusterStatsList).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a raw service load stats report with random data.
|
||||
*/
|
||||
private static ClusterStats generateServiceLoadStats() {
|
||||
private static ClusterStats generateClusterLoadStats(String clusterName) {
|
||||
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
|
||||
long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
|
||||
long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
|
||||
|
|
@ -416,6 +434,7 @@ public class LoadReportClientImplTest {
|
|||
|
||||
return
|
||||
ClusterStats.newBuilder()
|
||||
.setClusterName(clusterName)
|
||||
.addUpstreamLocalityStats(
|
||||
UpstreamLocalityStats.newBuilder()
|
||||
.setLocality(
|
||||
|
|
@ -440,21 +459,18 @@ public class LoadReportClientImplTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* For comparing LoadStatsRequest based on a collection of raw service load stats.
|
||||
* For comparing LoadStatsRequest stats data regardless of .
|
||||
*/
|
||||
private static class LoadStatsRequestMatcher implements ArgumentMatcher<LoadStatsRequest> {
|
||||
private final Map<String, ClusterStats> expectedStats = new HashMap<>();
|
||||
|
||||
LoadStatsRequestMatcher(Map<String, ClusterStats> serviceStats, long expectedIntervalNano) {
|
||||
for (String serviceName : serviceStats.keySet()) {
|
||||
// TODO(chengyuanzhang): the field to be populated should be cluster_service_name.
|
||||
LoadStatsRequestMatcher(Collection<ClusterStats> clusterStats, long expectedIntervalNano) {
|
||||
for (ClusterStats stats : clusterStats) {
|
||||
ClusterStats statsWithInterval =
|
||||
serviceStats.get(serviceName)
|
||||
.toBuilder()
|
||||
.setClusterName(serviceName)
|
||||
stats.toBuilder()
|
||||
.setLoadReportInterval(Durations.fromNanos(expectedIntervalNano))
|
||||
.build();
|
||||
expectedStats.put(serviceName, statsWithInterval);
|
||||
expectedStats.put(statsWithInterval.getClusterName(), statsWithInterval);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -44,6 +44,7 @@ import org.junit.runners.JUnit4;
|
|||
/** Unit tests for {@link LoadStatsStore}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadStatsStoreImplTest {
|
||||
private static final String CLUSTER_NAME = "cluster-test.googleapis.com";
|
||||
private static final Locality LOCALITY1 =
|
||||
new Locality("test_region1", "test_zone", "test_subzone");
|
||||
private static final Locality LOCALITY2 =
|
||||
|
|
@ -56,7 +57,8 @@ public class LoadStatsStoreImplTest {
|
|||
public void setUp() {
|
||||
localityLoadCounters = new ConcurrentHashMap<>();
|
||||
dropCounters = new ConcurrentHashMap<>();
|
||||
loadStatsStore = new LoadStatsStoreImpl(localityLoadCounters, dropCounters);
|
||||
loadStatsStore =
|
||||
new LoadStatsStoreImpl(CLUSTER_NAME, null, localityLoadCounters, dropCounters);
|
||||
}
|
||||
|
||||
private static List<EndpointLoadMetricStats> buildEndpointLoadMetricStatsList(
|
||||
|
|
@ -103,6 +105,7 @@ public class LoadStatsStoreImplTest {
|
|||
@Nullable List<UpstreamLocalityStats> upstreamLocalityStatsList,
|
||||
@Nullable List<DroppedRequests> droppedRequestsList) {
|
||||
ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder();
|
||||
clusterStatsBuilder.setClusterName(CLUSTER_NAME);
|
||||
if (upstreamLocalityStatsList != null) {
|
||||
clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -103,7 +103,6 @@ import java.util.Queue;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
|
@ -185,10 +184,9 @@ public class XdsClientImplTest {
|
|||
|
||||
private final Queue<StreamObserver<DiscoveryResponse>> responseObservers = new ArrayDeque<>();
|
||||
private final Queue<StreamObserver<DiscoveryRequest>> requestObservers = new ArrayDeque<>();
|
||||
private final AtomicBoolean callEnded = new AtomicBoolean(true);
|
||||
|
||||
private final AtomicBoolean adsEnded = new AtomicBoolean(true);
|
||||
private final Queue<LoadReportCall> loadReportCalls = new ArrayDeque<>();
|
||||
private final AtomicInteger runningLrsCalls = new AtomicInteger();
|
||||
private final AtomicBoolean lrsEnded = new AtomicBoolean(true);
|
||||
|
||||
@Mock
|
||||
private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
|
||||
|
|
@ -220,13 +218,13 @@ public class XdsClientImplTest {
|
|||
@Override
|
||||
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
|
||||
final StreamObserver<DiscoveryResponse> responseObserver) {
|
||||
assertThat(callEnded.get()).isTrue(); // ensure previous call was ended
|
||||
callEnded.set(false);
|
||||
assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended
|
||||
adsEnded.set(false);
|
||||
Context.current().addListener(
|
||||
new CancellationListener() {
|
||||
@Override
|
||||
public void cancelled(Context context) {
|
||||
callEnded.set(true);
|
||||
adsEnded.set(true);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
responseObservers.offer(responseObserver);
|
||||
|
|
@ -243,7 +241,8 @@ public class XdsClientImplTest {
|
|||
@Override
|
||||
public StreamObserver<LoadStatsRequest> streamLoadStats(
|
||||
StreamObserver<LoadStatsResponse> responseObserver) {
|
||||
runningLrsCalls.getAndIncrement();
|
||||
assertThat(lrsEnded.get()).isTrue();
|
||||
lrsEnded.set(false);
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
|
||||
final LoadReportCall call = new LoadReportCall(requestObserver, responseObserver);
|
||||
|
|
@ -252,7 +251,7 @@ public class XdsClientImplTest {
|
|||
@Override
|
||||
public void cancelled(Context context) {
|
||||
call.cancelled = true;
|
||||
runningLrsCalls.getAndDecrement();
|
||||
lrsEnded.set(true);
|
||||
}
|
||||
}, MoreExecutors.directExecutor());
|
||||
loadReportCalls.offer(call);
|
||||
|
|
@ -290,17 +289,13 @@ public class XdsClientImplTest {
|
|||
// least one watcher is registered.
|
||||
assertThat(responseObservers).isEmpty();
|
||||
assertThat(requestObservers).isEmpty();
|
||||
|
||||
// Load reporting is not initiated until being invoked to do so.
|
||||
assertThat(loadReportCalls).isEmpty();
|
||||
assertThat(runningLrsCalls.get()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
xdsClient.shutdown();
|
||||
assertThat(callEnded.get()).isTrue();
|
||||
assertThat(runningLrsCalls.get()).isEqualTo(0);
|
||||
assertThat(adsEnded.get()).isTrue();
|
||||
assertThat(lrsEnded.get()).isTrue();
|
||||
assertThat(channel.isShutdown()).isTrue();
|
||||
assertThat(fakeClock.getPendingTasks()).isEmpty();
|
||||
}
|
||||
|
|
@ -3102,21 +3097,15 @@ public class XdsClientImplTest {
|
|||
*/
|
||||
@Test
|
||||
public void reportLoadStatsToServer() {
|
||||
xdsClient.reportClientStats("cluster-foo.googleapis.com", "");
|
||||
LoadReportCall lrsCall1 = loadReportCalls.poll();
|
||||
verify(lrsCall1.requestObserver)
|
||||
.onNext(eq(buildInitialLoadStatsRequest("cluster-foo.googleapis.com")));
|
||||
assertThat(lrsCall1.cancelled).isFalse();
|
||||
LoadStatsStore loadStatsStore = mock(LoadStatsStore.class);
|
||||
String clusterName = "cluster-foo.googleapis.com";
|
||||
xdsClient.reportClientStats(clusterName, null, loadStatsStore);
|
||||
LoadReportCall lrsCall = loadReportCalls.poll();
|
||||
verify(lrsCall.requestObserver).onNext(eq(buildInitialLoadStatsRequest(clusterName)));
|
||||
|
||||
xdsClient.reportClientStats("cluster-bar.googleapis.com", "");
|
||||
LoadReportCall lrsCall2 = loadReportCalls.poll();
|
||||
verify(lrsCall2.requestObserver)
|
||||
.onNext(eq(buildInitialLoadStatsRequest("cluster-bar.googleapis.com")));
|
||||
assertThat(lrsCall2.cancelled).isFalse();
|
||||
|
||||
xdsClient.cancelClientStatsReport("cluster-bar.googleapis.com");
|
||||
assertThat(lrsCall2.cancelled).isTrue();
|
||||
assertThat(runningLrsCalls.get()).isEqualTo(1);
|
||||
xdsClient.cancelClientStatsReport(clusterName, null);
|
||||
assertThat(lrsCall.cancelled).isTrue();
|
||||
// See more test on LoadReportClientTest.java
|
||||
}
|
||||
|
||||
// Simulates the use case of watching clusters/endpoints based on service config resolved by
|
||||
|
|
|
|||
Loading…
Reference in New Issue