From 96da68bab3338b350cd79ed48c225a366b37ff6a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Wed, 4 Dec 2019 14:22:30 -0800 Subject: [PATCH] xds: modify LRS client interface (#6483) Add two APIs to LoadReportClient interface: - addLoadStatsStore(String, LoadStatsStore) - removeLoadStatsStore(String) Each LoadReportClient is responsible for reporting loads for a single cluster. But a gRPC client can spread loads to multiple EDS services per cluster, while loads for each EDS service should be aggregated separately. With this change, starting a LoadReportClient only starts LRS RPC, the source of load data to be reported is added via addLoadStatsStore API. --- .../java/io/grpc/xds/LoadReportClient.java | 35 ++++++++++++------- .../io/grpc/xds/LoadReportClientImpl.java | 15 +++++--- xds/src/main/java/io/grpc/xds/XdsClient.java | 5 ++- 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index cdab17e1cf..3b9e343196 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -19,40 +19,51 @@ package io.grpc.xds; import javax.annotation.concurrent.NotThreadSafe; /** - * An {@link LoadReportClient} is the gRPC client's load reporting agent that establishes + * A {@link LoadReportClient} is the gRPC client's load reporting agent that establishes * connections to traffic director for reporting load stats from gRPC client's perspective. * - *

Its operations should be self-contained and running independently along with xDS load - * balancer's load balancing protocol, although it shares the same channel to traffic director with - * xDS load balancer's load balancing protocol. - * - *

Its lifecycle is managed by the high-level xDS load balancer. + *

Each {@link LoadReportClient} instance is responsible for reporting loads for a single + * cluster. */ @NotThreadSafe interface LoadReportClient { /** - * Establishes load reporting communication and negotiates with the remote balancer to report load + * Establishes load reporting communication and negotiates with traffic director to report load * stats periodically. Calling this method on an already started {@link LoadReportClient} is * no-op. * - *

This method is not thread-safe and should be called from the same synchronized context - * returned by {@link XdsLoadBalancer2.Helper#getSynchronizationContext}. - * * @param callback containing methods to be invoked for passing information received from load * reporting responses to xDS load balancer. */ + // TODO(chengyuanzhang): do not expose this method. void startLoadReporting(LoadReportCallback callback); /** * Terminates load reporting. Calling this method on an already stopped * {@link LoadReportClient} is no-op. * - *

This method is not thread-safe and should be called from the same synchronized context - * returned by {@link XdsLoadBalancer2.Helper#getSynchronizationContext}. */ + // TODO(chengyuanzhang): do not expose this method. void stopLoadReporting(); + /** + * Provides this LoadReportClient source of load stats data for the given cluster service. + * If requested, data from the given {@code loadStatsStore} is periodically queried and + * sent to traffic director by this LoadReportClient. + * + * @param clusterServiceName name of the cluster service. + * @param loadStatsStore storage of load stats. + */ + void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore); + + /** + * Stops providing load stats data for the given cluster service. + * + * @param clusterServiceName name of the cluster service. + */ + void removeLoadStatsStore(String clusterServiceName); + /** * Callbacks for passing information received from client load reporting responses to xDS load * balancer, such as the load reporting interval requested by the traffic director. diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java b/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java index 753b9e2741..935ae6b35a 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java @@ -50,10 +50,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** - * Client of xDS load reporting service. - * - *

Methods in this class are expected to be called in the same synchronized context that {@link - * XdsLoadBalancer2.Helper#getSynchronizationContext} returns. + * Client of xDS load reporting service based on LRS protocol. */ @NotThreadSafe final class LoadReportClientImpl implements LoadReportClient { @@ -133,6 +130,16 @@ final class LoadReportClientImpl implements LoadReportClient { // Do not shutdown channel as it is not owned by LrsClient. } + @Override + public void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore) { + // TODO(chengyuanzhang): to be implemented. + } + + @Override + public void removeLoadStatsStore(String clusterServiceName) { + // TODO(chengyuanzhang): to be implemented. + } + @VisibleForTesting static class LoadReportingTask implements Runnable { private final LrsStream stream; diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 4086455662..925ed2de35 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -396,10 +396,9 @@ abstract class XdsClient { /** * Starts reporting client load stats to a remote server for the given cluster. - * - * @param loadStatsStore a in-memory data store containing loads recorded by gRPC client. */ - void reportClientStats(String clusterName, String serverUri, LoadStatsStore loadStatsStore) { + LoadReportClient reportClientStats(String clusterName, String serverUri) { + throw new UnsupportedOperationException(); } /**