diff --git a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java index 30963c1eb3..1b439dcd2b 100644 --- a/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/EdsLoadBalancer.java @@ -46,7 +46,6 @@ import io.grpc.xds.XdsClient.XdsChannelFactory; import io.grpc.xds.XdsClient.XdsClientFactory; import io.grpc.xds.XdsLoadBalancerProvider.XdsConfig; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,8 +62,6 @@ final class EdsLoadBalancer extends LoadBalancer { private final Bootstrapper bootstrapper; private final XdsChannelFactory channelFactory; private final Helper edsLbHelper; - // Cache for load stats stores for each service in cluster keyed by cluster service names. - private final Map loadStatsStoreMap = new HashMap<>(); // Most recent XdsConfig. @Nullable @@ -74,9 +71,10 @@ final class EdsLoadBalancer extends LoadBalancer { @Nullable private XdsClient xdsClient; @Nullable - private LoadReportClient loadReportClient; - @Nullable private String clusterName; + // FIXME(chengyuanzhang): should be one instance per cluster:cluster_service. + @Nullable + private LoadStatsStore loadStatsStore; EdsLoadBalancer(Helper edsLbHelper, ResourceUpdateCallback resourceUpdateCallback) { this( @@ -187,21 +185,22 @@ final class EdsLoadBalancer extends LoadBalancer { // TODO(zdapeng): Use the correct cluster name. Currently load reporting will be broken if // edsServiceName is changed because we are using edsServiceName for the cluster name. clusterName = clusterServiceName; + loadStatsStore = new LoadStatsStoreImpl(clusterName, null); } - boolean shouldReportStats = newXdsConfig.lrsServerName != null; - if (shouldReportStats && !isReportingStats()) { - // Start load reporting. This may be a restarting after previously stopping the load - // reporting, so need to re-add all the pre-existing loadStatsStores to the new - // loadReportClient. - loadReportClient = xdsClient.reportClientStats(clusterName, newXdsConfig.lrsServerName); - for (Map.Entry entry : loadStatsStoreMap.entrySet()) { - loadReportClient.addLoadStatsStore(entry.getKey(), entry.getValue()); + // FIXME(chengyuanzhang): should report loads for each cluster:cluster_service. + if (xdsConfig == null + || !Objects.equals(newXdsConfig.lrsServerName, xdsConfig.lrsServerName)) { + if (newXdsConfig.lrsServerName != null) { + if (!newXdsConfig.lrsServerName.equals("")) { + throw new AssertionError( + "Can only report load to the same management server"); + } + xdsClient.reportClientStats(clusterName, null, loadStatsStore); + } else if (xdsConfig != null) { + xdsClient.cancelClientStatsReport(clusterName, null); } } - if (!shouldReportStats && isReportingStats()) { - cancelClientStatsReport(); - } // Note: childPolicy change will be handled in LocalityStore, to be implemented. // If edsServiceName in XdsConfig is changed, do a graceful switch. @@ -232,25 +231,14 @@ final class EdsLoadBalancer extends LoadBalancer { public void shutdown() { channelLogger.log(ChannelLogLevel.DEBUG, "EDS load balancer is shutting down"); switchingLoadBalancer.shutdown(); - if (isReportingStats()) { - cancelClientStatsReport(); - } if (xdsClient != null) { + if (xdsConfig != null && xdsConfig.lrsServerName != null) { + xdsClient.cancelClientStatsReport(clusterName, null); + } xdsClient = xdsClientPool.returnObject(xdsClient); } } - /** Whether the client stats for the cluster is currently reported to the traffic director. */ - private boolean isReportingStats() { - return loadReportClient != null; - } - - /** Stops to report client stats for the cluster. */ - private void cancelClientStatsReport() { - xdsClient.cancelClientStatsReport(clusterName); - loadReportClient = null; - } - /** * A load balancer factory that provides a load balancer for a given cluster service. */ @@ -291,11 +279,6 @@ final class EdsLoadBalancer extends LoadBalancer { ClusterEndpointsBalancer(Helper helper) { this.helper = helper; - LoadStatsStore loadStatsStore = new LoadStatsStoreImpl(); - loadStatsStoreMap.put(clusterServiceName, loadStatsStore); - if (isReportingStats()) { - loadReportClient.addLoadStatsStore(clusterServiceName, loadStatsStore); - } localityStore = localityStoreFactory.newLocalityStore(helper, lbRegistry, loadStatsStore); endpointWatcher = new EndpointWatcherImpl(localityStore); @@ -320,10 +303,6 @@ final class EdsLoadBalancer extends LoadBalancer { @Override public void shutdown() { - loadStatsStoreMap.remove(clusterServiceName); - if (isReportingStats()) { - loadReportClient.removeLoadStatsStore(clusterServiceName); - } localityStore.reset(); xdsClient.cancelEndpointDataWatch(clusterServiceName, endpointWatcher); } diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 3b9e343196..aa0c3584f7 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -16,53 +16,361 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; +import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.api.v2.core.Node; +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; +import io.grpc.ManagedChannel; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.BackoffPolicy; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** - * A {@link LoadReportClient} is the gRPC client's load reporting agent that establishes - * connections to traffic director for reporting load stats from gRPC client's perspective. - * - *

Each {@link LoadReportClient} instance is responsible for reporting loads for a single - * cluster. + * Client of xDS load reporting service based on LRS protocol, which reports load stats of + * gRPC client's perspective to a management server. */ @NotThreadSafe -interface LoadReportClient { +final class LoadReportClient { + private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName()); + + private final ManagedChannel channel; + private final Node node; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timerService; + private final Supplier stopwatchSupplier; + private final Stopwatch retryStopwatch; + private final BackoffPolicy.Provider backoffPolicyProvider; + + // Sources of load stats data for each cluster. + // FIXME(chengyuanzhang): this should be Map> as each + // ClusterStats is keyed by cluster:cluster_service. Currently, cluster_service is always unset. + private final Map loadStatsStoreMap = new HashMap<>(); + private boolean started; + + @Nullable + private BackoffPolicy lrsRpcRetryPolicy; + @Nullable + private ScheduledHandle lrsRpcRetryTimer; + @Nullable + private LrsStream lrsStream; + @Nullable + private LoadReportCallback callback; + + LoadReportClient( + ManagedChannel channel, + Node node, + SynchronizationContext syncContext, + ScheduledExecutorService scheduledExecutorService, + BackoffPolicy.Provider backoffPolicyProvider, + Supplier stopwatchSupplier) { + this.channel = checkNotNull(channel, "channel"); + this.node = checkNotNull(node, "node"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.timerService = checkNotNull(scheduledExecutorService, "timeService"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); + this.retryStopwatch = stopwatchSupplier.get(); + started = false; + } /** * Establishes load reporting communication and negotiates with traffic director to report load * stats periodically. Calling this method on an already started {@link LoadReportClient} is * no-op. - * - * @param callback containing methods to be invoked for passing information received from load - * reporting responses to xDS load balancer. */ - // TODO(chengyuanzhang): do not expose this method. - void startLoadReporting(LoadReportCallback callback); + public void startLoadReporting(LoadReportCallback callback) { + if (started) { + return; + } + this.callback = callback; + started = true; + startLrsRpc(); + } /** * Terminates load reporting. Calling this method on an already stopped * {@link LoadReportClient} is no-op. - * */ - // TODO(chengyuanzhang): do not expose this method. - void stopLoadReporting(); + public void stopLoadReporting() { + if (!started) { + return; + } + if (lrsRpcRetryTimer != null) { + lrsRpcRetryTimer.cancel(); + } + if (lrsStream != null) { + lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException()); + } + started = false; + // Do not shutdown channel as it is not owned by LrsClient. + } /** - * Provides this LoadReportClient source of load stats data for the given cluster service. - * If requested, data from the given {@code loadStatsStore} is periodically queried and - * sent to traffic director by this LoadReportClient. + * Provides this LoadReportClient source of load stats data for the given + * cluster:cluster_service. If requested, data from the given loadStatsStore is + * periodically queried and sent to traffic director by this LoadReportClient. * - * @param clusterServiceName name of the cluster service. - * @param loadStatsStore storage of load stats. + *

Currently we expect load stats data for all clusters to report loads for are provided + * before load reporting starts (so that LRS initial request tells management server clusters + * it is reporting loads for). Design TBD for reporting loads for extra clusters after load + * reporting has started. + * + *

Note: currently clusterServiceName is always unset. */ - void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore); + public void addLoadStatsStore( + String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { + checkState( + !loadStatsStoreMap.containsKey(clusterName), + "load stats for cluster " + clusterName + " already exists"); + // FIXME(chengyuanzhang): relax this restriction after design is fleshed out. + checkState( + !started, + "load stats for all clusters to report loads for should be provided before " + + "load reporting has started"); + loadStatsStoreMap.put(clusterName, loadStatsStore); + } /** - * Stops providing load stats data for the given cluster service. + * Stops providing load stats data for the given cluster:cluster_service. * - * @param clusterServiceName name of the cluster service. + *

Note: currently clusterServiceName is always unset. */ - void removeLoadStatsStore(String clusterServiceName); + public void removeLoadStatsStore(String clusterName, @Nullable String clusterServiceName) { + checkState( + loadStatsStoreMap.containsKey(clusterName), + "load stats for cluster " + clusterName + " does not exist"); + loadStatsStoreMap.remove(clusterName); + } + + @VisibleForTesting + static class LoadReportingTask implements Runnable { + private final LrsStream stream; + + LoadReportingTask(LrsStream stream) { + this.stream = stream; + } + + @Override + public void run() { + stream.sendLoadReport(); + } + } + + @VisibleForTesting + class LrsRpcRetryTask implements Runnable { + + @Override + public void run() { + startLrsRpc(); + } + } + + private void startLrsRpc() { + checkState(lrsStream == null, "previous lbStream has not been cleared yet"); + LoadReportingServiceGrpc.LoadReportingServiceStub stub + = LoadReportingServiceGrpc.newStub(channel); + lrsStream = new LrsStream(stub, stopwatchSupplier.get()); + retryStopwatch.reset().start(); + lrsStream.start(); + } + + private class LrsStream implements StreamObserver { + + // Cluster to report loads for asked by management server. + final Set clusterNames = new HashSet<>(); + final LoadReportingServiceGrpc.LoadReportingServiceStub stub; + final Stopwatch reportStopwatch; + StreamObserver lrsRequestWriter; + boolean initialResponseReceived; + boolean closed; + long loadReportIntervalNano = -1; + ScheduledHandle loadReportTimer; + + LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) { + this.stub = checkNotNull(stub, "stub"); + reportStopwatch = checkNotNull(stopwatch, "stopwatch"); + } + + void start() { + lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this); + reportStopwatch.reset().start(); + // Tells management server which clusters the client is reporting loads for. + List clusterStatsList = new ArrayList<>(); + for (String clusterName : loadStatsStoreMap.keySet()) { + clusterStatsList.add(ClusterStats.newBuilder().setClusterName(clusterName).build()); + } + LoadStatsRequest initRequest = + LoadStatsRequest.newBuilder() + .setNode(node) + .addAllClusterStats(clusterStatsList) + .build(); + lrsRequestWriter.onNext(initRequest); + logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest); + } + + @Override + public void onNext(final LoadStatsResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleResponse(response); + } + }); + } + + @Override + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleStreamClosed(Status.fromThrowable(t) + .augmentDescription("Stream to XDS management server had an error")); + } + }); + } + + @Override + public void onCompleted() { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleStreamClosed( + Status.UNAVAILABLE.withDescription("Stream to XDS management server was closed")); + } + }); + } + + private void sendLoadReport() { + long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS); + reportStopwatch.reset().start(); + LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node); + for (String name : clusterNames) { + if (loadStatsStoreMap.containsKey(name)) { + LoadStatsStore loadStatsStore = loadStatsStoreMap.get(name); + ClusterStats report = + loadStatsStore.generateLoadReport() + .toBuilder() + .setLoadReportInterval(Durations.fromNanos(interval)) + .build(); + requestBuilder.addClusterStats(report); + } + } + LoadStatsRequest request = requestBuilder.build(); + lrsRequestWriter.onNext(request); + logger.log(Level.FINE, "Sent LoadStatsRequest\n{0}", request); + scheduleNextLoadReport(); + } + + private void scheduleNextLoadReport() { + // Cancel pending load report and reschedule with updated load reporting interval. + if (loadReportTimer != null && loadReportTimer.isPending()) { + loadReportTimer.cancel(); + loadReportTimer = null; + } + if (loadReportIntervalNano > 0) { + loadReportTimer = syncContext.schedule( + new LoadReportingTask(this), loadReportIntervalNano, TimeUnit.NANOSECONDS, + timerService); + } + } + + private void handleResponse(LoadStatsResponse response) { + if (closed) { + return; + } + + if (!initialResponseReceived) { + logger.log(Level.FINE, "Received LRS initial response: {0}", response); + initialResponseReceived = true; + } else { + logger.log(Level.FINE, "Received an LRS response: {0}", response); + } + loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval()); + callback.onReportResponse(loadReportIntervalNano); + clusterNames.clear(); + clusterNames.addAll(response.getClustersList()); + scheduleNextLoadReport(); + } + + private void handleStreamClosed(Status status) { + checkArgument(!status.isOk(), "unexpected OK status"); + if (closed) { + return; + } + closed = true; + cleanUp(); + + long delayNanos = 0; + if (initialResponseReceived || lrsRpcRetryPolicy == null) { + // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence + // has never been initialized. + lrsRpcRetryPolicy = backoffPolicyProvider.get(); + } + // Backoff only when balancer wasn't working previously. + if (!initialResponseReceived) { + // The back-off policy determines the interval between consecutive RPC upstarts, thus the + // actual delay may be smaller than the value from the back-off policy, or even negative, + // depending how much time was spent in the previous RPC. + delayNanos = + lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS); + } + logger.log(Level.FINE, "LRS stream closed, backoff in {0} second(s)", + TimeUnit.NANOSECONDS.toSeconds(delayNanos <= 0 ? 0 : delayNanos)); + if (delayNanos <= 0) { + startLrsRpc(); + } else { + lrsRpcRetryTimer = + syncContext.schedule(new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, + timerService); + } + } + + private void close(@Nullable Exception error) { + if (closed) { + return; + } + closed = true; + cleanUp(); + if (error == null) { + lrsRequestWriter.onCompleted(); + } else { + lrsRequestWriter.onError(error); + } + } + + private void cleanUp() { + if (loadReportTimer != null) { + loadReportTimer.cancel(); + loadReportTimer = null; + } + if (lrsStream == this) { + lrsStream = null; + } + } + } /** * Callbacks for passing information received from client load reporting responses to xDS load diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java b/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java deleted file mode 100644 index ac5e6a89d1..0000000000 --- a/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.google.common.base.Supplier; -import com.google.protobuf.util.Durations; -import io.envoyproxy.envoy.api.v2.core.Node; -import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; -import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; -import io.grpc.ManagedChannel; -import io.grpc.Status; -import io.grpc.SynchronizationContext; -import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.internal.BackoffPolicy; -import io.grpc.stub.StreamObserver; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -/** - * Client of xDS load reporting service based on LRS protocol. - */ -@NotThreadSafe -final class LoadReportClientImpl implements LoadReportClient { - - // TODO(chengyuanzhang): use channel logger once XdsClientImpl migrates to use channel logger. - private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName()); - - private final String clusterName; - private final ManagedChannel channel; - private final Node node; - private final SynchronizationContext syncContext; - private final ScheduledExecutorService timerService; - private final Supplier stopwatchSupplier; - private final Stopwatch retryStopwatch; - private final BackoffPolicy.Provider backoffPolicyProvider; - - // Sources of load stats data for each service in cluster. - private final Map loadStatsStoreMap = new HashMap<>(); - private boolean started; - - @Nullable - private BackoffPolicy lrsRpcRetryPolicy; - @Nullable - private ScheduledHandle lrsRpcRetryTimer; - @Nullable - private LrsStream lrsStream; - @Nullable - private LoadReportCallback callback; - - LoadReportClientImpl(ManagedChannel channel, - String clusterName, - Node node, - SynchronizationContext syncContext, - ScheduledExecutorService scheduledExecutorService, - BackoffPolicy.Provider backoffPolicyProvider, - Supplier stopwatchSupplier) { - this.channel = checkNotNull(channel, "channel"); - this.clusterName = checkNotNull(clusterName, "clusterName"); - this.node = checkNotNull(node, "node"); - this.syncContext = checkNotNull(syncContext, "syncContext"); - this.timerService = checkNotNull(scheduledExecutorService, "timeService"); - this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); - this.retryStopwatch = stopwatchSupplier.get(); - started = false; - } - - @Override - public void startLoadReporting(LoadReportCallback callback) { - if (started) { - return; - } - this.callback = callback; - started = true; - startLrsRpc(); - } - - @Override - public void stopLoadReporting() { - if (!started) { - return; - } - if (lrsRpcRetryTimer != null) { - lrsRpcRetryTimer.cancel(); - } - if (lrsStream != null) { - lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException()); - } - started = false; - // Do not shutdown channel as it is not owned by LrsClient. - } - - @Override - public void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore) { - loadStatsStoreMap.put(clusterServiceName, loadStatsStore); - } - - @Override - public void removeLoadStatsStore(String clusterServiceName) { - loadStatsStoreMap.remove(clusterServiceName); - } - - @VisibleForTesting - static class LoadReportingTask implements Runnable { - private final LrsStream stream; - - LoadReportingTask(LrsStream stream) { - this.stream = stream; - } - - @Override - public void run() { - stream.sendLoadReport(); - } - } - - @VisibleForTesting - class LrsRpcRetryTask implements Runnable { - - @Override - public void run() { - startLrsRpc(); - } - } - - private void startLrsRpc() { - checkState(lrsStream == null, "previous lbStream has not been cleared yet"); - LoadReportingServiceGrpc.LoadReportingServiceStub stub - = LoadReportingServiceGrpc.newStub(channel); - lrsStream = new LrsStream(stub, stopwatchSupplier.get()); - retryStopwatch.reset().start(); - lrsStream.start(); - } - - private class LrsStream implements StreamObserver { - - // Cluster services to report loads for, instructed by LRS responses. - final Set clusterServiceNames = new HashSet<>(); - final LoadReportingServiceGrpc.LoadReportingServiceStub stub; - final Stopwatch reportStopwatch; - StreamObserver lrsRequestWriter; - boolean initialResponseReceived; - boolean closed; - long loadReportIntervalNano = -1; - ScheduledHandle loadReportTimer; - - LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) { - this.stub = checkNotNull(stub, "stub"); - reportStopwatch = checkNotNull(stopwatch, "stopwatch"); - } - - void start() { - lrsRequestWriter = stub.withWaitForReady().streamLoadStats(this); - reportStopwatch.reset().start(); - LoadStatsRequest initRequest = - LoadStatsRequest.newBuilder() - .setNode(node) - .addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName)) - .build(); - lrsRequestWriter.onNext(initRequest); - logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest); - } - - @Override - public void onNext(final LoadStatsResponse response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleResponse(response); - } - }); - } - - @Override - public void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed(Status.fromThrowable(t) - .augmentDescription("Stream to XDS management server had an error")); - } - }); - } - - @Override - public void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed( - Status.UNAVAILABLE.withDescription("Stream to XDS management server was closed")); - } - }); - } - - private void sendLoadReport() { - long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS); - reportStopwatch.reset().start(); - LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node); - for (String serviceName : clusterServiceNames) { - if (loadStatsStoreMap.containsKey(serviceName)) { - LoadStatsStore loadStatsStore = loadStatsStoreMap.get(serviceName); - ClusterStats report = - loadStatsStore.generateLoadReport() - .toBuilder() - .setClusterName(serviceName) - .setLoadReportInterval(Durations.fromNanos(interval)) - .build(); - requestBuilder.addClusterStats(report); - } - } - LoadStatsRequest request = requestBuilder.build(); - lrsRequestWriter.onNext(request); - logger.log(Level.FINE, "Sent LoadStatsRequest\n{0}", request); - scheduleNextLoadReport(); - } - - private void scheduleNextLoadReport() { - // Cancel pending load report and reschedule with updated load reporting interval. - if (loadReportTimer != null && loadReportTimer.isPending()) { - loadReportTimer.cancel(); - loadReportTimer = null; - } - if (loadReportIntervalNano > 0) { - loadReportTimer = syncContext.schedule( - new LoadReportingTask(this), loadReportIntervalNano, TimeUnit.NANOSECONDS, - timerService); - } - } - - private void handleResponse(LoadStatsResponse response) { - if (closed) { - return; - } - - if (!initialResponseReceived) { - logger.log(Level.FINE, "Received LRS initial response: {0}", response); - initialResponseReceived = true; - } else { - logger.log(Level.FINE, "Received an LRS response: {0}", response); - } - loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval()); - callback.onReportResponse(loadReportIntervalNano); - clusterServiceNames.clear(); - clusterServiceNames.addAll(response.getClustersList()); - scheduleNextLoadReport(); - } - - private void handleStreamClosed(Status status) { - checkArgument(!status.isOk(), "unexpected OK status"); - if (closed) { - return; - } - closed = true; - cleanUp(); - - long delayNanos = 0; - if (initialResponseReceived || lrsRpcRetryPolicy == null) { - // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence - // has never been initialized. - lrsRpcRetryPolicy = backoffPolicyProvider.get(); - } - // Backoff only when balancer wasn't working previously. - if (!initialResponseReceived) { - // The back-off policy determines the interval between consecutive RPC upstarts, thus the - // actual delay may be smaller than the value from the back-off policy, or even negative, - // depending how much time was spent in the previous RPC. - delayNanos = - lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS); - } - logger.log(Level.FINE, "LRS stream closed, backoff in {0} second(s)", - TimeUnit.NANOSECONDS.toSeconds(delayNanos <= 0 ? 0 : delayNanos)); - if (delayNanos <= 0) { - startLrsRpc(); - } else { - lrsRpcRetryTimer = - syncContext.schedule(new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, - timerService); - } - } - - private void close(@Nullable Exception error) { - if (closed) { - return; - } - closed = true; - cleanUp(); - if (error == null) { - lrsRequestWriter.onCompleted(); - } else { - lrsRequestWriter.onError(error); - } - } - - private void cleanUp() { - if (loadReportTimer != null) { - loadReportTimer.cancel(); - loadReportTimer = null; - } - if (lrsStream == this) { - lrsStream = null; - } - } - } -} diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsStore.java b/xds/src/main/java/io/grpc/xds/LoadStatsStore.java index 86eeb07a23..76e430aa8d 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsStore.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStore.java @@ -21,14 +21,14 @@ import io.grpc.xds.EnvoyProtoData.Locality; import javax.annotation.Nullable; /** - * Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats for - * a service cluster (i.e., GSLB service) exposed by traffic director from a gRPC client's - * perspective, including dropped calls instructed by traffic director. Load stats for endpoints - * (i.e., Google backends) are aggregated in locality granularity (i.e., Google cluster) while the - * numbers of dropped calls are aggregated in cluster granularity. + * Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats per + * cluster:cluster_service exposed by traffic director from a gRPC client's perspective, + * including dropped calls. Load stats for endpoints (i.e., Google backends) are aggregated in + * locality granularity (i.e., Google cluster) while the numbers of dropped calls are aggregated + * in cluster:cluster_service granularity. * - *

An {@code LoadStatsStore} lives the same span of lifecycle as a cluster and - * only tracks loads for localities exposed by remote traffic director. A proper usage should be + *

An {@code LoadStatsStore} only tracks loads for localities exposed by remote traffic + * director. A proper usage should be * *

    *
  1. Let {@link LoadStatsStore} track the locality newly exposed by traffic director by @@ -41,10 +41,6 @@ import javax.annotation.Nullable; * *

    No locality information is needed for recording dropped calls since they are aggregated in * cluster granularity. - * - *

    Note implementations should only be responsible for keeping track of loads and generating - * load reports with load data, any load reporting information should be opaque to {@code - * LoadStatsStore} and be set outside. */ interface LoadStatsStore { @@ -61,7 +57,7 @@ interface LoadStatsStore { * reporting. * *

    This method is not thread-safe and should be called from the same synchronized context - * used by {@link XdsClient}. + * used by {@link LoadReportClient}. */ ClusterStats generateLoadReport(); @@ -70,10 +66,10 @@ interface LoadStatsStore { * endpoints in added localities will be recorded and included in generated load reports. * *

    This method needs to be called at locality updates only for newly assigned localities in - * balancer discovery responses before recording loads for those localities. + * endpoint discovery responses before recording loads for those localities. * *

    This method is not thread-safe and should be called from the same synchronized context - * used by {@link XdsClient}. + * used by {@link LoadReportClient}. */ void addLocality(Locality locality); @@ -88,7 +84,7 @@ interface LoadStatsStore { * waste and keep including zero-load upstream locality stats in generated load reports. * *

    This method is not thread-safe and should be called from the same synchronized context - * used by {@link XdsClient}. + * used by {@link LoadReportClient}. */ void removeLocality(Locality locality); diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java index 0dfeca125b..37ff09d91b 100644 --- a/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** @@ -40,19 +41,27 @@ import javax.annotation.concurrent.NotThreadSafe; */ @NotThreadSafe final class LoadStatsStoreImpl implements LoadStatsStore { - + private final String clusterName; + @Nullable + @SuppressWarnings("unused") + private final String clusterServiceName; private final ConcurrentMap localityLoadCounters; // Cluster level dropped request counts for each category decision made by xDS load balancer. private final ConcurrentMap dropCounters; - LoadStatsStoreImpl() { - this(new ConcurrentHashMap(), + LoadStatsStoreImpl(String clusterName, @Nullable String clusterServiceName) { + this(clusterName, clusterServiceName, new ConcurrentHashMap(), new ConcurrentHashMap()); } @VisibleForTesting - LoadStatsStoreImpl(ConcurrentMap localityLoadCounters, + LoadStatsStoreImpl( + String clusterName, + @Nullable String clusterServiceName, + ConcurrentMap localityLoadCounters, ConcurrentMap dropCounters) { + this.clusterName = checkNotNull(clusterName, "clusterName"); + this.clusterServiceName = clusterServiceName; this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters"); this.dropCounters = checkNotNull(dropCounters, "dropCounters"); } @@ -60,6 +69,8 @@ final class LoadStatsStoreImpl implements LoadStatsStore { @Override public ClusterStats generateLoadReport() { ClusterStats.Builder statsBuilder = ClusterStats.newBuilder(); + statsBuilder.setClusterName(clusterName); + // TODO(chengyuangzhang): also set cluster_service_name if provided. for (Map.Entry entry : localityLoadCounters.entrySet()) { ClientLoadSnapshot snapshot = entry.getValue().snapshot(); UpstreamLocalityStats.Builder localityStatsBuilder = diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 288ad2fb92..3fc0fc0e36 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -417,16 +417,21 @@ abstract class XdsClient { } /** - * Starts reporting client load stats to a remote server for the given cluster. + * Report client load stats to a remote server for the given cluster:cluster_service. + * + *

    Note: currently we can only report loads for a single cluster:cluster_service, + * as the design for adding clusters to report loads for while load reporting is + * happening is undefined. */ - LoadReportClient reportClientStats(String clusterName, String serverUri) { + void reportClientStats( + String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { throw new UnsupportedOperationException(); } /** - * Stops reporting client load stats to the remote server for the given cluster. + * Stops reporting client load stats to the remote server for the given cluster:cluster_service. */ - void cancelClientStatsReport(String clusterName) { + void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) { } abstract static class XdsClientFactory { diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index dea30bcc9a..257404e395 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -124,9 +124,6 @@ final class XdsClientImpl extends XdsClient { // watchers can watch endpoints in the same cluster. private final Map> endpointWatchers = new HashMap<>(); - // Load reporting clients, with each responsible for reporting loads of a single cluster. - private final Map lrsClients = new HashMap<>(); - // Resource fetch timers are used to conclude absence of resources. Each timer is activated when // subscription for the resource starts and disarmed on first update for the resource. @@ -150,6 +147,8 @@ final class XdsClientImpl extends XdsClient { private BackoffPolicy retryBackoffPolicy; @Nullable private ScheduledHandle rpcRetryTimer; + @Nullable + private LoadReportClient lrsClient; // Following fields are set only after the ConfigWatcher registered. Once set, they should // never change. @@ -189,8 +188,9 @@ final class XdsClientImpl extends XdsClient { adsStream.close(Status.CANCELLED.withDescription("shutdown").asException()); } cleanUpResources(); - for (LoadReportClientImpl lrsClient : lrsClients.values()) { + if (lrsClient != null) { lrsClient.stopLoadReporting(); + lrsClient = null; } if (rpcRetryTimer != null) { rpcRetryTimer.cancel(); @@ -405,37 +405,31 @@ final class XdsClientImpl extends XdsClient { } @Override - LoadReportClient reportClientStats(String clusterName, String serverUri) { - checkNotNull(serverUri, "serverUri"); - checkArgument(serverUri.equals(""), - "Currently only support empty serverUri, which defaults to the same " - + "management server this client talks to."); - if (!lrsClients.containsKey(clusterName)) { - LoadReportClientImpl lrsClient = - new LoadReportClientImpl( - channel, - clusterName, - node, - syncContext, - timeService, - backoffPolicyProvider, - stopwatchSupplier); - lrsClient.startLoadReporting( - new LoadReportCallback() { - @Override - public void onReportResponse(long reportIntervalNano) {} - }); - lrsClients.put(clusterName, lrsClient); - } - return lrsClients.get(clusterName); + void reportClientStats( + String clusterName, @Nullable String clusterServiceName, LoadStatsStore loadStatsStore) { + checkState(lrsClient == null, + "load reporting has already started, cannot change clusters to report loads for"); + lrsClient = + new LoadReportClient( + channel, + node, + syncContext, + timeService, + backoffPolicyProvider, + stopwatchSupplier); + lrsClient.addLoadStatsStore(clusterName, clusterServiceName, loadStatsStore); + lrsClient.startLoadReporting(new LoadReportCallback() { + @Override + public void onReportResponse(long reportIntervalNano) {} + }); } @Override - void cancelClientStatsReport(String clusterName) { - LoadReportClientImpl lrsClient = lrsClients.remove(clusterName); - if (lrsClient != null) { - lrsClient.stopLoadReporting(); - } + void cancelClientStatsReport(String clusterName, @Nullable String clusterServiceName) { + checkState(lrsClient != null, "load reporting was never started"); + lrsClient.removeLoadStatsStore(clusterName, clusterServiceName); + lrsClient.stopLoadReporting(); + lrsClient = null; } /** diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java similarity index 78% rename from xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java rename to xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 633e2799d9..09a726104e 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.util.Durations; @@ -54,6 +53,8 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.LoadReportClient.LoadReportCallback; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,19 +75,17 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Unit tests for {@link LoadReportClientImpl}. + * Unit tests for {@link LoadReportClient}. */ @RunWith(JUnit4.class) -public class LoadReportClientImplTest { - - private static final String CLUSTER_NAME = "foo.blade.googleapis.com"; +public class LoadReportClientTest { private static final Node NODE = Node.newBuilder().setId("LRS test").build(); private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER = new FakeClock.TaskFilter() { @Override public boolean shouldAccept(Runnable command) { return command.toString() - .contains(LoadReportClientImpl.LoadReportingTask.class.getSimpleName()); + .contains(LoadReportClient.LoadReportingTask.class.getSimpleName()); } }; private static final FakeClock.TaskFilter LRS_RPC_RETRY_TASK_FILTER = @@ -94,14 +93,9 @@ public class LoadReportClientImplTest { @Override public boolean shouldAccept(Runnable command) { return command.toString() - .contains(LoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName()); + .contains(LoadReportClient.LrsRpcRetryTask.class.getSimpleName()); } }; - private static final LoadStatsRequest EXPECTED_INITIAL_REQ = - LoadStatsRequest.newBuilder() - .setNode(NODE) - .addClusterStats(ClusterStats.newBuilder().setClusterName(CLUSTER_NAME)) - .build(); @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); @@ -134,7 +128,7 @@ public class LoadReportClientImplTest { private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService; private ManagedChannel channel; - private LoadReportClientImpl lrsClient; + private LoadReportClient lrsClient; @SuppressWarnings("unchecked") @Before @@ -172,14 +166,12 @@ public class LoadReportClientImplTest { when(backoffPolicy2.nextBackoffNanos()) .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); lrsClient = - new LoadReportClientImpl( + new LoadReportClient( channel, - CLUSTER_NAME, NODE, syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, fakeClock.getStopwatchSupplier()); - lrsClient.startLoadReporting(callback); } @After @@ -190,21 +182,29 @@ public class LoadReportClientImplTest { @Test public void typicalWorkflow() { + String cluster1 = "cluster-foo.googleapis.com"; + String cluster2 = "cluster-bar.googleapis.com"; + ClusterStats rawStats1 = generateClusterLoadStats(cluster1); + ClusterStats rawStats2 = generateClusterLoadStats(cluster2); + when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1); + when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2); + lrsClient.addLoadStatsStore(cluster1, null, loadStatsStore1); + lrsClient.addLoadStatsStore(cluster2, null, loadStatsStore2); + lrsClient.startLoadReporting(callback); + verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); StreamObserver requestObserver = Iterables.getOnlyElement(lrsRequestObservers); - InOrder inOrder = inOrder(requestObserver); - inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); + InOrder inOrder = inOrder(requestObserver, callback); + inOrder.verify(requestObserver).onNext(eq(buildInitialRequest(cluster1, cluster2))); - String service1 = "namespace-foo:service-blade"; - ClusterStats rawStats1 = generateServiceLoadStats(); - when(loadStatsStore1.generateLoadReport()).thenReturn(rawStats1); - lrsClient.addLoadStatsStore(service1, loadStatsStore1); - responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1), 1000)); + // Management server asks to report loads for cluster1. + responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 1000)); + inOrder.verify(callback).onReportResponse(1000); ArgumentMatcher expectedLoadReportMatcher = - new LoadStatsRequestMatcher(ImmutableMap.of(service1, rawStats1), 1000); + new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 1000); fakeClock.forwardNanos(999); inOrder.verifyNoMoreInteractions(); fakeClock.forwardNanos(1); @@ -214,48 +214,55 @@ public class LoadReportClientImplTest { inOrder.verify(requestObserver).onNext(argThat(expectedLoadReportMatcher)); // Management server updates the interval of sending load reports. - responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1), 2000)); + responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1), 2000)); + inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(1000); inOrder.verifyNoMoreInteractions(); fakeClock.forwardNanos(1000); inOrder.verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(service1, rawStats1), 2000))); + .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats1), 2000))); - String service2 = "namespace-bar:service-baz"; - ClusterStats rawStats2 = generateServiceLoadStats(); - when(loadStatsStore2.generateLoadReport()).thenReturn(rawStats2); - lrsClient.addLoadStatsStore(service2, loadStatsStore2); - - // Management server asks to report loads for an extra cluster service. - responseObserver.onNext(buildLrsResponse(ImmutableList.of(service1, service2), 2000)); + // Management server asks to report loads for cluster1 and cluster2. + responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster1, cluster2), 2000)); + inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) .onNext( argThat( - new LoadStatsRequestMatcher( - ImmutableMap.of(service1, rawStats1, service2, rawStats2), 2000))); + new LoadStatsRequestMatcher(ImmutableList.of(rawStats1, rawStats2), 2000))); - // Load reports for one of existing service is no longer wanted. - responseObserver.onNext(buildLrsResponse(ImmutableList.of(service2), 2000)); + // Load reports for cluster1 is no longer wanted. + responseObserver.onNext(buildLrsResponse(ImmutableList.of(cluster2), 2000)); + inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); inOrder.verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(service2, rawStats2), 2000))); + .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(rawStats2), 2000))); - // Management server asks loads for a cluster service that client has no load data. - responseObserver.onNext(buildLrsResponse(ImmutableList.of("namespace-ham:service-spam"), 2000)); + // Management server asks loads for a cluster that client has no load data. + responseObserver + .onNext(buildLrsResponse(ImmutableList.of("cluster-unknown.googleapis.com"), 2000)); + inOrder.verify(callback).onReportResponse(2000); fakeClock.forwardNanos(2000); ArgumentCaptor reportCaptor = ArgumentCaptor.forClass(null); inOrder.verify(requestObserver).onNext(reportCaptor.capture()); assertThat(reportCaptor.getValue().getClusterStatsCount()).isEqualTo(0); + + inOrder.verifyNoMoreInteractions(); } @Test public void lrsStreamClosedAndRetried() { + String clusterName = "cluster-foo.googleapis.com"; + ClusterStats stats = generateClusterLoadStats(clusterName); + when(loadStatsStore1.generateLoadReport()).thenReturn(stats); + lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); + lrsClient.startLoadReporting(callback); + InOrder inOrder = inOrder(mockLoadReportingService, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); @@ -264,7 +271,7 @@ public class LoadReportClientImplTest { StreamObserver requestObserver = lrsRequestObservers.poll(); // First balancer RPC - verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Balancer closes it immediately (erroneously) @@ -284,7 +291,7 @@ public class LoadReportClientImplTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ)); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Balancer closes it with an error. @@ -303,13 +310,12 @@ public class LoadReportClientImplTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ)); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); - // Balancer sends a response asking for loads of some cluster service. - String serviceName = "namespace-foo:service-blade"; + // Balancer sends a response asking for loads of the cluster. responseObserver - .onNext(buildLrsResponse(ImmutableList.of(serviceName), 0)); + .onNext(buildLrsResponse(ImmutableList.of(clusterName), 0)); // Then breaks the RPC responseObserver.onError(Status.UNAVAILABLE.asException()); @@ -320,7 +326,7 @@ public class LoadReportClientImplTest { responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ)); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); // Fail the retry after spending 4ns fakeClock.forwardNanos(4); @@ -338,19 +344,16 @@ public class LoadReportClientImplTest { inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); assertThat(lrsRequestObservers).hasSize(1); requestObserver = lrsRequestObservers.poll(); - verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ)); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Load reporting back to normal. responseObserver = lrsResponseObserverCaptor.getValue(); - ClusterStats stats = generateServiceLoadStats(); - when(loadStatsStore1.generateLoadReport()).thenReturn(stats); - lrsClient.addLoadStatsStore(serviceName, loadStatsStore1); responseObserver - .onNext(buildLrsResponse(ImmutableList.of(serviceName), 10)); + .onNext(buildLrsResponse(ImmutableList.of(clusterName), 10)); fakeClock.forwardNanos(10); verify(requestObserver) - .onNext(argThat(new LoadStatsRequestMatcher(ImmutableMap.of(serviceName, stats), 10))); + .onNext(argThat(new LoadStatsRequestMatcher(ImmutableList.of(stats), 10))); // Wrapping up verify(backoffPolicyProvider, times(2)).get(); @@ -360,13 +363,19 @@ public class LoadReportClientImplTest { @Test public void raceBetweenLoadReportingAndLbStreamClosure() { + String clusterName = "cluster-foo.googleapis.com"; + ClusterStats stats = generateClusterLoadStats(clusterName); + when(loadStatsStore1.generateLoadReport()).thenReturn(stats); + lrsClient.addLoadStatsStore(clusterName, null, loadStatsStore1); + lrsClient.startLoadReporting(callback); + verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); // First balancer RPC - verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); + verify(requestObserver).onNext(eq(buildInitialRequest(clusterName))); assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER)); // Simulate receiving a response from traffic director. @@ -394,19 +403,28 @@ public class LoadReportClientImplTest { } private static LoadStatsResponse buildLrsResponse( - List clusterServiceNames, long loadReportIntervalNanos) { + List clusterNames, long loadReportIntervalNanos) { return LoadStatsResponse .newBuilder() - .addAllClusters(clusterServiceNames) + .addAllClusters(clusterNames) .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)) .build(); } + private static LoadStatsRequest buildInitialRequest(String... clusters) { + List clusterStatsList = new ArrayList<>(); + for (String cluster : clusters) { + clusterStatsList.add(ClusterStats.newBuilder().setClusterName(cluster).build()); + } + return + LoadStatsRequest.newBuilder().setNode(NODE).addAllClusterStats(clusterStatsList).build(); + } + /** * Generates a raw service load stats report with random data. */ - private static ClusterStats generateServiceLoadStats() { + private static ClusterStats generateClusterLoadStats(String clusterName) { long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsFailed = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); @@ -416,6 +434,7 @@ public class LoadReportClientImplTest { return ClusterStats.newBuilder() + .setClusterName(clusterName) .addUpstreamLocalityStats( UpstreamLocalityStats.newBuilder() .setLocality( @@ -440,21 +459,18 @@ public class LoadReportClientImplTest { } /** - * For comparing LoadStatsRequest based on a collection of raw service load stats. + * For comparing LoadStatsRequest stats data regardless of . */ private static class LoadStatsRequestMatcher implements ArgumentMatcher { private final Map expectedStats = new HashMap<>(); - LoadStatsRequestMatcher(Map serviceStats, long expectedIntervalNano) { - for (String serviceName : serviceStats.keySet()) { - // TODO(chengyuanzhang): the field to be populated should be cluster_service_name. + LoadStatsRequestMatcher(Collection clusterStats, long expectedIntervalNano) { + for (ClusterStats stats : clusterStats) { ClusterStats statsWithInterval = - serviceStats.get(serviceName) - .toBuilder() - .setClusterName(serviceName) + stats.toBuilder() .setLoadReportInterval(Durations.fromNanos(expectedIntervalNano)) .build(); - expectedStats.put(serviceName, statsWithInterval); + expectedStats.put(statsWithInterval.getClusterName(), statsWithInterval); } } diff --git a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java index af6c4d8ac2..d58fd4bb92 100644 --- a/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java @@ -44,6 +44,7 @@ import org.junit.runners.JUnit4; /** Unit tests for {@link LoadStatsStore}. */ @RunWith(JUnit4.class) public class LoadStatsStoreImplTest { + private static final String CLUSTER_NAME = "cluster-test.googleapis.com"; private static final Locality LOCALITY1 = new Locality("test_region1", "test_zone", "test_subzone"); private static final Locality LOCALITY2 = @@ -56,7 +57,8 @@ public class LoadStatsStoreImplTest { public void setUp() { localityLoadCounters = new ConcurrentHashMap<>(); dropCounters = new ConcurrentHashMap<>(); - loadStatsStore = new LoadStatsStoreImpl(localityLoadCounters, dropCounters); + loadStatsStore = + new LoadStatsStoreImpl(CLUSTER_NAME, null, localityLoadCounters, dropCounters); } private static List buildEndpointLoadMetricStatsList( @@ -103,6 +105,7 @@ public class LoadStatsStoreImplTest { @Nullable List upstreamLocalityStatsList, @Nullable List droppedRequestsList) { ClusterStats.Builder clusterStatsBuilder = ClusterStats.newBuilder(); + clusterStatsBuilder.setClusterName(CLUSTER_NAME); if (upstreamLocalityStatsList != null) { clusterStatsBuilder.addAllUpstreamLocalityStats(upstreamLocalityStatsList); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index 55dccdd6fe..907f57f467 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -103,7 +103,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -185,10 +184,9 @@ public class XdsClientImplTest { private final Queue> responseObservers = new ArrayDeque<>(); private final Queue> requestObservers = new ArrayDeque<>(); - private final AtomicBoolean callEnded = new AtomicBoolean(true); - + private final AtomicBoolean adsEnded = new AtomicBoolean(true); private final Queue loadReportCalls = new ArrayDeque<>(); - private final AtomicInteger runningLrsCalls = new AtomicInteger(); + private final AtomicBoolean lrsEnded = new AtomicBoolean(true); @Mock private AggregatedDiscoveryServiceImplBase mockedDiscoveryService; @@ -220,13 +218,13 @@ public class XdsClientImplTest { @Override public StreamObserver streamAggregatedResources( final StreamObserver responseObserver) { - assertThat(callEnded.get()).isTrue(); // ensure previous call was ended - callEnded.set(false); + assertThat(adsEnded.get()).isTrue(); // ensure previous call was ended + adsEnded.set(false); Context.current().addListener( new CancellationListener() { @Override public void cancelled(Context context) { - callEnded.set(true); + adsEnded.set(true); } }, MoreExecutors.directExecutor()); responseObservers.offer(responseObserver); @@ -243,7 +241,8 @@ public class XdsClientImplTest { @Override public StreamObserver streamLoadStats( StreamObserver responseObserver) { - runningLrsCalls.getAndIncrement(); + assertThat(lrsEnded.get()).isTrue(); + lrsEnded.set(false); @SuppressWarnings("unchecked") StreamObserver requestObserver = mock(StreamObserver.class); final LoadReportCall call = new LoadReportCall(requestObserver, responseObserver); @@ -252,7 +251,7 @@ public class XdsClientImplTest { @Override public void cancelled(Context context) { call.cancelled = true; - runningLrsCalls.getAndDecrement(); + lrsEnded.set(true); } }, MoreExecutors.directExecutor()); loadReportCalls.offer(call); @@ -290,17 +289,13 @@ public class XdsClientImplTest { // least one watcher is registered. assertThat(responseObservers).isEmpty(); assertThat(requestObservers).isEmpty(); - - // Load reporting is not initiated until being invoked to do so. - assertThat(loadReportCalls).isEmpty(); - assertThat(runningLrsCalls.get()).isEqualTo(0); } @After public void tearDown() { xdsClient.shutdown(); - assertThat(callEnded.get()).isTrue(); - assertThat(runningLrsCalls.get()).isEqualTo(0); + assertThat(adsEnded.get()).isTrue(); + assertThat(lrsEnded.get()).isTrue(); assertThat(channel.isShutdown()).isTrue(); assertThat(fakeClock.getPendingTasks()).isEmpty(); } @@ -3102,21 +3097,15 @@ public class XdsClientImplTest { */ @Test public void reportLoadStatsToServer() { - xdsClient.reportClientStats("cluster-foo.googleapis.com", ""); - LoadReportCall lrsCall1 = loadReportCalls.poll(); - verify(lrsCall1.requestObserver) - .onNext(eq(buildInitialLoadStatsRequest("cluster-foo.googleapis.com"))); - assertThat(lrsCall1.cancelled).isFalse(); + LoadStatsStore loadStatsStore = mock(LoadStatsStore.class); + String clusterName = "cluster-foo.googleapis.com"; + xdsClient.reportClientStats(clusterName, null, loadStatsStore); + LoadReportCall lrsCall = loadReportCalls.poll(); + verify(lrsCall.requestObserver).onNext(eq(buildInitialLoadStatsRequest(clusterName))); - xdsClient.reportClientStats("cluster-bar.googleapis.com", ""); - LoadReportCall lrsCall2 = loadReportCalls.poll(); - verify(lrsCall2.requestObserver) - .onNext(eq(buildInitialLoadStatsRequest("cluster-bar.googleapis.com"))); - assertThat(lrsCall2.cancelled).isFalse(); - - xdsClient.cancelClientStatsReport("cluster-bar.googleapis.com"); - assertThat(lrsCall2.cancelled).isTrue(); - assertThat(runningLrsCalls.get()).isEqualTo(1); + xdsClient.cancelClientStatsReport(clusterName, null); + assertThat(lrsCall.cancelled).isTrue(); + // See more test on LoadReportClientTest.java } // Simulates the use case of watching clusters/endpoints based on service config resolved by