diff --git a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java index c574a7ab2f..19bce770b5 100644 --- a/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java +++ b/xds/src/main/java/io/grpc/xds/ClientLoadCounter.java @@ -39,7 +39,8 @@ import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.ThreadSafe; /** - * Client side aggregator for load stats. + * Client side load stats recorder that provides RPC counting and metrics recording as name-value + * pairs. * *

All methods except {@link #snapshot()} in this class are thread-safe. */ @@ -97,12 +98,14 @@ final class ClientLoadCounter { } /** - * Generate snapshot for recorded query counts and metrics since previous snapshot. + * Generates a snapshot for load stats recorded in this counter. Successive snapshots represent + * load stats recorded for the interval since the previous snapshot. So taking a snapshot clears + * the counter state except for ongoing RPC recordings. * *

This method is not thread-safe and must be called from {@link * io.grpc.LoadBalancer.Helper#getSynchronizationContext()}. */ - public ClientLoadSnapshot snapshot() { + ClientLoadSnapshot snapshot() { Map aggregatedValues = new HashMap<>(); for (MetricRecorder recorder : metricRecorders) { Map map = recorder.takeAll(); @@ -133,8 +136,8 @@ final class ClientLoadCounter { } /** - * A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter} to be sent as - * part of {@link io.envoyproxy.envoy.api.v2.endpoint.ClusterStats} to the balancer. + * A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter}, which is a + * read-only copy of load stats recorded for some period of time. */ static final class ClientLoadSnapshot { @@ -259,8 +262,8 @@ final class ClientLoadCounter { } /** - * An {@link LoadRecordingStreamTracerFactory} instance records and aggregates client-side load - * data into an {@link ClientLoadCounter} object. + * An {@link LoadRecordingStreamTracerFactory} instance for creating client stream tracers that + * records and aggregates client-side load data into an {@link ClientLoadCounter} object. */ @ThreadSafe @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java similarity index 75% rename from xds/src/main/java/io/grpc/xds/XdsLoadReportClient.java rename to xds/src/main/java/io/grpc/xds/LoadReportClient.java index 20ed897025..6c7b39dee4 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -19,16 +19,21 @@ package io.grpc.xds; import javax.annotation.concurrent.NotThreadSafe; /** - * An {@link XdsLoadReportClient} is in charge of recording client side load stats, collecting - * backend cost metrics and sending load reports to the remote balancer. It shares the same - * channel with {@link XdsLoadBalancer} and its lifecycle is managed by {@link XdsLoadBalancer}. + * An {@link LoadReportClient} is the gRPC client's load reporting agent that establishes + * connections to traffic director for reporting load stats from gRPC client's perspective. + * + *

Its operations should be self-contained and running independently along with xDS load + * balancer's load balancing protocol, although it shares the same channel to traffic director with + * xDS load balancer's load balancing protocol. + * + *

Its lifecycle is managed by the high-level xDS load balancer. */ @NotThreadSafe -interface XdsLoadReportClient { +interface LoadReportClient { /** * Establishes load reporting communication and negotiates with the remote balancer to report load - * stats periodically. Calling this method on an already started {@link XdsLoadReportClient} is + * stats periodically. Calling this method on an already started {@link LoadReportClient} is * no-op. * *

This method is not thread-safe and should be called from the same synchronized context @@ -37,11 +42,11 @@ interface XdsLoadReportClient { * @param callback containing methods to be invoked for passing information received from load * reporting responses to xDS load balancer. */ - void startLoadReporting(XdsLoadReportCallback callback); + void startLoadReporting(LoadReportCallback callback); /** * Terminates load reporting. Calling this method on an already stopped - * {@link XdsLoadReportClient} is no-op. + * {@link LoadReportClient} is no-op. * *

This method is not thread-safe and should be called from the same synchronized context * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. @@ -55,7 +60,7 @@ interface XdsLoadReportClient { *

Implementations are not required to be thread-safe as callbacks will be invoked in xDS load * balancer's {@link io.grpc.SynchronizationContext}. */ - interface XdsLoadReportCallback { + interface LoadReportCallback { /** * The load reporting interval has been received. diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadReportClientImpl.java b/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java similarity index 88% rename from xds/src/main/java/io/grpc/xds/XdsLoadReportClientImpl.java rename to xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java index d2bfbeea40..d57377e186 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadReportClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClientImpl.java @@ -50,12 +50,13 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; /** - * Client of XDS load reporting service. Methods in this class are expected to be called in - * the same synchronized context that {@link XdsLoadBalancer.Helper#getSynchronizationContext} - * returns. + * Client of xDS load reporting service. + * + *

Methods in this class are expected to be called in the same synchronized context that {@link + * XdsLoadBalancer.Helper#getSynchronizationContext} returns. */ @NotThreadSafe -final class XdsLoadReportClientImpl implements XdsLoadReportClient { +final class LoadReportClientImpl implements LoadReportClient { @VisibleForTesting static final String TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD @@ -70,32 +71,31 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient { private final Stopwatch retryStopwatch; private final ChannelLogger logger; private final BackoffPolicy.Provider backoffPolicyProvider; - private final StatsStore statsStore; + private final LoadStatsStore loadStatsStore; private boolean started; @Nullable private BackoffPolicy lrsRpcRetryPolicy; @Nullable private ScheduledHandle lrsRpcRetryTimer; - @Nullable private LrsStream lrsStream; @Nullable - private XdsLoadReportCallback callback; + private LoadReportCallback callback; - XdsLoadReportClientImpl(ManagedChannel channel, + private LoadReportClientImpl(ManagedChannel channel, Helper helper, BackoffPolicy.Provider backoffPolicyProvider, - StatsStore statsStore) { - this(channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, backoffPolicyProvider, statsStore); + LoadStatsStore loadStatsStore) { + this(channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, backoffPolicyProvider, loadStatsStore); } @VisibleForTesting - XdsLoadReportClientImpl(ManagedChannel channel, + LoadReportClientImpl(ManagedChannel channel, Helper helper, Supplier stopwatchSupplier, BackoffPolicy.Provider backoffPolicyProvider, - StatsStore statsStore) { + LoadStatsStore loadStatsStore) { this.channel = checkNotNull(channel, "channel"); this.serviceName = checkNotNull(helper.getAuthority(), "serviceName"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); @@ -104,12 +104,12 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient { this.logger = checkNotNull(helper.getChannelLogger(), "logger"); this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); - this.statsStore = checkNotNull(statsStore, "statsStore"); + this.loadStatsStore = checkNotNull(loadStatsStore, "loadStatsStore"); started = false; } @Override - public void startLoadReporting(XdsLoadReportCallback callback) { + public void startLoadReporting(LoadReportCallback callback) { if (started) { return; } @@ -235,7 +235,7 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient { long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS); reportStopwatch.reset().start(); ClusterStats report = - statsStore.generateLoadReport() + loadStatsStore.generateLoadReport() .toBuilder() .setClusterName(clusterName) .setLoadReportInterval(Durations.fromNanos(interval)) @@ -346,25 +346,29 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient { } } - abstract static class XdsLoadReportClientFactory { + /** + * Factory class for creating {@link LoadReportClient} instances. + */ + abstract static class LoadReportClientFactory { - private static final XdsLoadReportClientFactory DEFAULT_INSTANCE = - new XdsLoadReportClientFactory() { + private static final LoadReportClientFactory DEFAULT_INSTANCE = + new LoadReportClientFactory() { @Override - XdsLoadReportClient createLoadReportClient( + LoadReportClient createLoadReportClient( ManagedChannel channel, Helper helper, Provider backoffPolicyProvider, - StatsStore statsStore) { - return new XdsLoadReportClientImpl(channel, helper, backoffPolicyProvider, statsStore); + LoadStatsStore loadStatsStore) { + return new LoadReportClientImpl(channel, helper, backoffPolicyProvider, + loadStatsStore); } }; - static XdsLoadReportClientFactory getInstance() { + static LoadReportClientFactory getInstance() { return DEFAULT_INSTANCE; } - abstract XdsLoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper, - BackoffPolicy.Provider backoffPolicyProvider, StatsStore statsStore); + abstract LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper, + BackoffPolicy.Provider backoffPolicyProvider, LoadStatsStore loadStatsStore); } } diff --git a/xds/src/main/java/io/grpc/xds/LoadStatsStore.java b/xds/src/main/java/io/grpc/xds/LoadStatsStore.java new file mode 100644 index 0000000000..b45f4a6fc5 --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStore.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; +import javax.annotation.Nullable; + +/** + * Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats for + * a service cluster (i.e., GSLB service) exposed by traffic director from a gRPC client's + * perspective, including dropped calls instructed by traffic director. Load stats for endpoints + * (i.e., Google backends) are aggregated in locality granularity (i.e., Google cluster) while the + * numbers of dropped calls are aggregated in cluster granularity. + * + *

An {@code LoadStatsStore} lives the same span of lifecycle as {@link XdsLoadBalancer} and + * only tracks loads for localities exposed by remote traffic director. A proper usage should be + * + *

    + *
  1. Let {@link LoadStatsStore} track the locality newly exposed by traffic director by + * calling {@link #addLocality(XdsLocality)}. + *
  2. Use the locality counter returned by {@link #getLocalityCounter(XdsLocality)} to record + * load stats for the corresponding locality. + *
  3. Tell {@link LoadStatsStore} to stop tracking the locality no longer exposed by traffic + * director by calling {@link #removeLocality(XdsLocality)}. + *
+ * + *

No locality information is needed for recording dropped calls since they are aggregated in + * cluster granularity. + * + *

Note implementations should only be responsible for keeping track of loads and generating + * load reports with load data, any load reporting information should be opaque to {@code + * LoadStatsStore} and be set outside. + */ +interface LoadStatsStore { + + /** + * Generates a {@link ClusterStats} proto message as the load report based on recorded load stats + * (including RPC * counts, backend metrics and dropped calls) for the interval since the previous + * call of this method. + * + *

Loads for localities no longer under tracking will not be included in generated load reports + * once all of theirs loads are completed and reported. + * + *

The fields {@code cluster_name} and {@code load_report_interval} in the returned {@link + * ClusterStats} needs to be set before it is ready to be sent to the traffic directory for load + * reporting. + * + *

This method is not thread-safe and should be called from the same synchronized context + * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. + */ + ClusterStats generateLoadReport(); + + /** + * Starts tracking load stats for endpoints in the provided locality. Only load stats for + * endpoints in added localities will be recorded and included in generated load reports. + * + *

This method needs to be called at locality updates only for newly assigned localities in + * balancer discovery responses before recording loads for those localities. + * + *

This method is not thread-safe and should be called from the same synchronized context + * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. + */ + void addLocality(XdsLocality locality); + + /** + * Stops tracking load stats for endpoints in the provided locality. gRPC clients are expected not + * to send loads to localities no longer exposed by traffic director. Load stats for endpoints in + * removed localities will no longer be included in future generated load reports after their + * recorded and ongoing loads have been reported. + * + *

This method needs to be called at locality updates only for newly removed localities. + * Forgetting calling this method for localities no longer under track will result in memory + * waste and keep including zero-load upstream locality stats in generated load reports. + * + *

This method is not thread-safe and should be called from the same synchronized context + * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. + */ + void removeLocality(XdsLocality locality); + + /** + * Returns the locality counter that does locality level stats aggregation for the provided + * locality. If the provided locality is not tracked, {@code null} will be returned. + * + *

This method is thread-safe. + */ + @Nullable + ClientLoadCounter getLocalityCounter(XdsLocality locality); + + /** + * Records a drop decision made by a {@link io.grpc.LoadBalancer.SubchannelPicker} instance + * with the provided category. Drops are aggregated in cluster granularity. + * + *

This method is thread-safe. + */ + void recordDroppedRequest(String category); +} diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java similarity index 86% rename from xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java rename to xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java index 9d40a3e745..c3fa05cdff 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadStatsStore.java +++ b/xds/src/main/java/io/grpc/xds/LoadStatsStoreImpl.java @@ -33,31 +33,29 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.concurrent.NotThreadSafe; /** - * An {@link XdsLoadStatsStore} instance holds the client side load stats for a cluster. + * An {@link LoadStatsStoreImpl} instance holds the load stats for a cluster from an gRPC + * client's perspective by maintaining a set of locality counters for each locality it is tracking + * loads for. */ @NotThreadSafe -final class XdsLoadStatsStore implements StatsStore { +final class LoadStatsStoreImpl implements LoadStatsStore { private final ConcurrentMap localityLoadCounters; - // Cluster level dropped request counts for each category specified in the DropOverload policy. + // Cluster level dropped request counts for each category decision made by xDS load balancer. private final ConcurrentMap dropCounters; - XdsLoadStatsStore() { + LoadStatsStoreImpl() { this(new ConcurrentHashMap(), new ConcurrentHashMap()); } @VisibleForTesting - XdsLoadStatsStore(ConcurrentMap localityLoadCounters, + LoadStatsStoreImpl(ConcurrentMap localityLoadCounters, ConcurrentMap dropCounters) { this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters"); this.dropCounters = checkNotNull(dropCounters, "dropCounters"); } - /** - * Generates a {@link ClusterStats} containing client side load stats and backend metrics - * (if any) in locality granularity. - */ @Override public ClusterStats generateLoadReport() { ClusterStats.Builder statsBuilder = ClusterStats.newBuilder(); @@ -96,10 +94,6 @@ final class XdsLoadStatsStore implements StatsStore { return statsBuilder.build(); } - /** - * Create a {@link ClientLoadCounter} for the provided locality or make it active if already in - * this {@link XdsLoadStatsStore}. - */ @Override public void addLocality(final XdsLocality locality) { ClientLoadCounter counter = localityLoadCounters.get(locality); @@ -112,10 +106,6 @@ final class XdsLoadStatsStore implements StatsStore { } } - /** - * Deactivate the {@link ClientLoadCounter} for the provided locality in by this - * {@link XdsLoadStatsStore}. - */ @Override public void removeLocality(final XdsLocality locality) { ClientLoadCounter counter = localityLoadCounters.get(locality); diff --git a/xds/src/main/java/io/grpc/xds/LocalityStore.java b/xds/src/main/java/io/grpc/xds/LocalityStore.java index d8a51ae81d..d30d01c776 100644 --- a/xds/src/main/java/io/grpc/xds/LocalityStore.java +++ b/xds/src/main/java/io/grpc/xds/LocalityStore.java @@ -77,7 +77,7 @@ interface LocalityStore { void updateOobMetricsReportInterval(long reportIntervalNano); - StatsStore getStatsStore(); + LoadStatsStore getLoadStatsStore(); final class LocalityStoreImpl implements LocalityStore { private static final String ROUND_ROBIN = "round_robin"; @@ -86,7 +86,7 @@ interface LocalityStore { private final PickerFactory pickerFactory; private final LoadBalancerProvider loadBalancerProvider; private final ThreadSafeRandom random; - private final StatsStore statsStore; + private final LoadStatsStore loadStatsStore; private final OrcaPerRequestUtil orcaPerRequestUtil; private final OrcaOobUtil orcaOobUtil; @@ -96,7 +96,7 @@ interface LocalityStore { LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) { this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance, - new XdsLoadStatsStore(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance()); + new LoadStatsStoreImpl(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance()); } @VisibleForTesting @@ -105,7 +105,7 @@ interface LocalityStore { PickerFactory pickerFactory, LoadBalancerRegistry lbRegistry, ThreadSafeRandom random, - StatsStore statsStore, + LoadStatsStore loadStatsStore, OrcaPerRequestUtil orcaPerRequestUtil, OrcaOobUtil orcaOobUtil) { this.helper = checkNotNull(helper, "helper"); @@ -114,7 +114,7 @@ interface LocalityStore { lbRegistry.getProvider(ROUND_ROBIN), "Unable to find '%s' LoadBalancer", ROUND_ROBIN); this.random = checkNotNull(random, "random"); - this.statsStore = checkNotNull(statsStore, "statsStore"); + this.loadStatsStore = checkNotNull(loadStatsStore, "loadStatsStore"); this.orcaPerRequestUtil = checkNotNull(orcaPerRequestUtil, "orcaPerRequestUtil"); this.orcaOobUtil = checkNotNull(orcaOobUtil, "orcaOobUtil"); } @@ -129,15 +129,15 @@ interface LocalityStore { final ImmutableList dropOverloads; final SubchannelPicker delegate; final ThreadSafeRandom random; - final StatsStore statsStore; + final LoadStatsStore loadStatsStore; DroppablePicker( ImmutableList dropOverloads, SubchannelPicker delegate, - ThreadSafeRandom random, StatsStore statsStore) { + ThreadSafeRandom random, LoadStatsStore loadStatsStore) { this.dropOverloads = dropOverloads; this.delegate = delegate; this.random = random; - this.statsStore = statsStore; + this.loadStatsStore = loadStatsStore; } @Override @@ -145,7 +145,7 @@ interface LocalityStore { for (DropOverload dropOverload : dropOverloads) { int rand = random.nextInt(1000_000); if (rand < dropOverload.dropsPerMillion) { - statsStore.recordDroppedRequest(dropOverload.category); + loadStatsStore.recordDroppedRequest(dropOverload.category); return PickResult.withDrop(Status.UNAVAILABLE.withDescription( "dropped by loadbalancer: " + dropOverload.toString())); } @@ -184,7 +184,7 @@ interface LocalityStore { public void reset() { for (XdsLocality locality : localityMap.keySet()) { localityMap.get(locality).shutdown(); - statsStore.removeLocality(locality); + loadStatsStore.removeLocality(locality); } localityMap = ImmutableMap.of(); } @@ -221,9 +221,10 @@ interface LocalityStore { oldLocalityLbInfo.childBalancer, childHelper); } else { - statsStore.addLocality(newLocality); + loadStatsStore.addLocality(newLocality); childHelper = - new ChildHelper(newLocality, statsStore.getLocalityCounter(newLocality), orcaOobUtil); + new ChildHelper(newLocality, loadStatsStore.getLocalityCounter(newLocality), + orcaOobUtil); localityLbInfo = new LocalityLbInfo( localityInfoMap.get(newLocality).localityWeight, @@ -260,7 +261,7 @@ interface LocalityStore { @Override public void run() { for (XdsLocality locality : toRemove) { - statsStore.removeLocality(locality); + loadStatsStore.removeLocality(locality); } } }); @@ -272,8 +273,8 @@ interface LocalityStore { } @Override - public StatsStore getStatsStore() { - return statsStore; + public LoadStatsStore getLoadStatsStore() { + return loadStatsStore; } @Override @@ -348,7 +349,7 @@ interface LocalityStore { } if (!dropOverloads.isEmpty()) { - picker = new DroppablePicker(dropOverloads, picker, random, statsStore); + picker = new DroppablePicker(dropOverloads, picker, random, loadStatsStore); if (state == null) { state = IDLE; } diff --git a/xds/src/main/java/io/grpc/xds/StatsStore.java b/xds/src/main/java/io/grpc/xds/StatsStore.java deleted file mode 100644 index 4e6d949f54..0000000000 --- a/xds/src/main/java/io/grpc/xds/StatsStore.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; -import javax.annotation.Nullable; - -/** - * Interface for client side load stats store. A {@code StatsStore} implementation should only be - * responsible for keeping track of load data aggregation, any load reporting information should - * be opaque to {@code StatsStore} and be set outside. - */ -interface StatsStore { - /** - * Generates a {@link ClusterStats} containing load stats and backend metrics in locality - * granularity, as well service level drop stats for the interval since the previous call of - * this method. The fields cluster_name and load_report_interval in the returned - * {@link ClusterStats} needs to be set before it is ready to be sent to the traffic directory - * for load reporting. - * - *

This method should be called in the same synchronized context that - * {@link XdsLoadBalancer.Helper#getSynchronizationContext} returns. - */ - ClusterStats generateLoadReport(); - - /** - * Tracks load stats for endpoints in the provided locality. To be called upon balancer locality - * updates only for newly assigned localities. Only load stats for endpoints in added localities - * will be reported to the remote balancer. This method needs to be called at locality updates - * only for newly assigned localities in balancer discovery responses. - * - *

This method is not thread-safe and should be called from the same synchronized context - * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. - */ - void addLocality(XdsLocality locality); - - /** - * Stops tracking load stats for endpoints in the provided locality. To be called upon balancer - * locality updates only for newly removed localities. Load stats for endpoints in removed - * localities will no longer be reported to the remote balancer when client stop sending loads - * to them. - * - *

This method is not thread-safe and should be called from the same synchronized context * - * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. - */ - void removeLocality(XdsLocality locality); - - /** - * Returns the {@link ClientLoadCounter} that does locality level stats aggregation for the - * provided locality. If the provided locality is not tracked, {@code null} will be returned. - * - *

This method is thread-safe. - */ - @Nullable - ClientLoadCounter getLocalityCounter(XdsLocality locality); - - /** - * Records a drop decision made by a {@link io.grpc.LoadBalancer.SubchannelPicker} instance - * with the provided category. Drops are aggregated in service level. - * - *

This method is thread-safe. - */ - void recordDroppedRequest(String category); -} diff --git a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java index ffe2f144a2..288cbb8b34 100644 --- a/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/XdsLoadBalancer.java @@ -43,10 +43,10 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.xds.LoadReportClient.LoadReportCallback; +import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; import io.grpc.xds.LocalityStore.LocalityStoreImpl; import io.grpc.xds.XdsComms.AdsStreamCallback; -import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback; -import io.grpc.xds.XdsLoadReportClientImpl.XdsLoadReportClientFactory; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.List; import java.util.Map; @@ -65,10 +65,10 @@ final class XdsLoadBalancer extends LoadBalancer { private final LoadBalancerRegistry lbRegistry; private final FallbackManager fallbackManager; private final BackoffPolicy.Provider backoffPolicyProvider; - private final XdsLoadReportClientFactory lrsClientFactory; + private final LoadReportClientFactory lrsClientFactory; @Nullable - private XdsLoadReportClient lrsClient; + private LoadReportClient lrsClient; @Nullable private XdsLbState xdsLbState; private final AdsStreamCallback adsStreamCallback = new AdsStreamCallback() { @@ -100,8 +100,8 @@ final class XdsLoadBalancer extends LoadBalancer { } }; - private final XdsLoadReportCallback lrsCallback = - new XdsLoadReportCallback() { + private final LoadReportCallback lrsCallback = + new LoadReportCallback() { @Override public void onReportResponse(long reportIntervalNano) { @@ -113,14 +113,14 @@ final class XdsLoadBalancer extends LoadBalancer { XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider) { - this(helper, lbRegistry, backoffPolicyProvider, XdsLoadReportClientFactory.getInstance(), + this(helper, lbRegistry, backoffPolicyProvider, LoadReportClientFactory.getInstance(), new FallbackManager(helper, lbRegistry)); } private XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider, - XdsLoadReportClientFactory lrsClientFactory, + LoadReportClientFactory lrsClientFactory, FallbackManager fallbackManager) { this(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory, fallbackManager, new LocalityStoreImpl(new LocalityStoreHelper(helper, fallbackManager), lbRegistry)); @@ -130,7 +130,7 @@ final class XdsLoadBalancer extends LoadBalancer { XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, BackoffPolicy.Provider backoffPolicyProvider, - XdsLoadReportClientFactory lrsClientFactory, + LoadReportClientFactory lrsClientFactory, FallbackManager fallbackManager, LocalityStore localityStore) { this.helper = checkNotNull(helper, "helper"); @@ -203,7 +203,7 @@ final class XdsLoadBalancer extends LoadBalancer { lbChannel = initLbChannel(helper, newBalancerName); lrsClient = lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider, - localityStore.getStatsStore()); + localityStore.getLoadStatsStore()); } else if (!newBalancerName.equals(xdsLbState.balancerName)) { lrsClient.stopLoadReporting(); ManagedChannel oldChannel = @@ -214,7 +214,7 @@ final class XdsLoadBalancer extends LoadBalancer { lbChannel = initLbChannel(helper, newBalancerName); lrsClient = lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider, - localityStore.getStatsStore()); + localityStore.getLoadStatsStore()); } else if (!Objects.equal( getPolicyNameOrNull(childPolicy), getPolicyNameOrNull(xdsLbState.childPolicy))) { diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java similarity index 94% rename from xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java rename to xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java index 3781c793e1..e7e13494ab 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadReportClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientImplTest.java @@ -53,7 +53,7 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; -import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback; +import io.grpc.xds.LoadReportClient.LoadReportCallback; import java.text.MessageFormat; import java.util.ArrayDeque; import java.util.concurrent.ThreadLocalRandom; @@ -73,10 +73,10 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; /** - * Unit tests for {@link XdsLoadReportClientImpl}. + * Unit tests for {@link LoadReportClientImpl}. */ @RunWith(JUnit4.class) -public class XdsLoadReportClientImplTest { +public class LoadReportClientImplTest { private static final String SERVICE_AUTHORITY = "api.google.com"; private static final String CLUSTER_NAME = "gslb-namespace:gslb-service-name"; @@ -85,7 +85,7 @@ public class XdsLoadReportClientImplTest { @Override public boolean shouldAccept(Runnable command) { return command.toString() - .contains(XdsLoadReportClientImpl.LoadReportingTask.class.getSimpleName()); + .contains(LoadReportClientImpl.LoadReportingTask.class.getSimpleName()); } }; private static final FakeClock.TaskFilter LRS_RPC_RETRY_TASK_FILTER = @@ -93,7 +93,7 @@ public class XdsLoadReportClientImplTest { @Override public boolean shouldAccept(Runnable command) { return command.toString() - .contains(XdsLoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName()); + .contains(LoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName()); } }; private static final Locality TEST_LOCALITY = @@ -102,6 +102,14 @@ public class XdsLoadReportClientImplTest { .setZone("test_zone") .setSubZone("test_subzone") .build(); + private static final LoadStatsRequest EXPECTED_INITIAL_REQ = LoadStatsRequest.newBuilder() + .setNode(Node.newBuilder() + .setMetadata(Struct.newBuilder() + .putFields( + LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD, + Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))) + .build(); + @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); private final SynchronizationContext syncContext = new SynchronizationContext( @@ -123,46 +131,28 @@ public class XdsLoadReportClientImplTest { log(level, MessageFormat.format(template, args)); } }; - private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService; private final FakeClock fakeClock = new FakeClock(); private final ArrayDeque> lrsRequestObservers = new ArrayDeque<>(); - @Captor - private ArgumentCaptor> lrsResponseObserverCaptor; @Mock private Helper helper; @Mock private BackoffPolicy.Provider backoffPolicyProvider; - private static final LoadStatsRequest EXPECTED_INITIAL_REQ = LoadStatsRequest.newBuilder() - .setNode(Node.newBuilder() - .setMetadata(Struct.newBuilder() - .putFields( - XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD, - Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))) - .build(); @Mock private BackoffPolicy backoffPolicy1; - private ManagedChannel channel; - private XdsLoadReportClientImpl lrsClient; @Mock private BackoffPolicy backoffPolicy2; @Mock - private StatsStore statsStore; + private LoadStatsStore loadStatsStore; @Mock - private XdsLoadReportCallback callback; + private LoadReportCallback callback; + @Captor + private ArgumentCaptor> lrsResponseObserverCaptor; - private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) { - return ClusterStats.newBuilder() - .setClusterName(CLUSTER_NAME) - .setLoadReportInterval(Durations.fromNanos(loadReportIntervalNanos)).build(); - } - - private static LoadStatsResponse buildLrsResponse(long loadReportIntervalNanos) { - return LoadStatsResponse.newBuilder() - .addClusters(CLUSTER_NAME) - .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)).build(); - } + private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService; + private ManagedChannel channel; + private LoadReportClientImpl lrsClient; @SuppressWarnings("unchecked") @Before @@ -203,9 +193,9 @@ public class XdsLoadReportClientImplTest { when(backoffPolicy2.nextBackoffNanos()) .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); lrsClient = - new XdsLoadReportClientImpl(channel, helper, fakeClock.getStopwatchSupplier(), + new LoadReportClientImpl(channel, helper, fakeClock.getStopwatchSupplier(), backoffPolicyProvider, - statsStore); + loadStatsStore); lrsClient.startLoadReporting(callback); } @@ -214,28 +204,6 @@ public class XdsLoadReportClientImplTest { lrsClient.stopLoadReporting(); } - private void assertNextReport(InOrder inOrder, StreamObserver requestObserver, - ClusterStats expectedStats) { - long loadReportIntervalNanos = Durations.toNanos(expectedStats.getLoadReportInterval()); - assertEquals(0, fakeClock.forwardTime(loadReportIntervalNanos - 1, TimeUnit.NANOSECONDS)); - inOrder.verifyNoMoreInteractions(); - assertEquals(1, fakeClock.forwardTime(1, TimeUnit.NANOSECONDS)); - // A second load report is scheduled upon the first is sent. - assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); - inOrder.verify(statsStore).generateLoadReport(); - ArgumentCaptor reportCaptor = ArgumentCaptor.forClass(null); - inOrder.verify(requestObserver).onNext(reportCaptor.capture()); - LoadStatsRequest report = reportCaptor.getValue(); - assertEquals(report.getNode(), Node.newBuilder() - .setMetadata(Struct.newBuilder() - .putFields( - XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD, - Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())) - .build()); - assertEquals(1, report.getClusterStatsCount()); - assertThat(report.getClusterStats(0)).isEqualTo(expectedStats); - } - @Test public void loadReportInitialRequest() { verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); @@ -277,8 +245,8 @@ public class XdsLoadReportClientImplTest { StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); - when(statsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); - InOrder inOrder = inOrder(requestObserver, statsStore); + when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); + InOrder inOrder = inOrder(requestObserver, loadStatsStore); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ); logs.poll(); @@ -297,9 +265,9 @@ public class XdsLoadReportClientImplTest { assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); - when(statsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); + when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); - InOrder inOrder = inOrder(requestObserver, statsStore); + InOrder inOrder = inOrder(requestObserver, loadStatsStore); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ); logs.poll(); @@ -325,7 +293,7 @@ public class XdsLoadReportClientImplTest { StreamObserver responseObserver = lrsResponseObserverCaptor.getValue(); assertThat(lrsRequestObservers).hasSize(1); StreamObserver requestObserver = lrsRequestObservers.poll(); - InOrder inOrder = inOrder(requestObserver, statsStore); + InOrder inOrder = inOrder(requestObserver, loadStatsStore); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); @@ -366,7 +334,7 @@ public class XdsLoadReportClientImplTest { .setDroppedCount(0)) .setTotalDroppedRequests(0) .build(); - when(statsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2); + when(loadStatsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2); responseObserver.onNext(buildLrsResponse(1362)); assertNextReport(inOrder, requestObserver, expectedStats1); @@ -519,4 +487,38 @@ public class XdsLoadReportClientImplTest { inOrder.verify(requestObserver, never()).onNext(any(LoadStatsRequest.class)); assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); } + + private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) { + return ClusterStats.newBuilder() + .setClusterName(CLUSTER_NAME) + .setLoadReportInterval(Durations.fromNanos(loadReportIntervalNanos)).build(); + } + + private static LoadStatsResponse buildLrsResponse(long loadReportIntervalNanos) { + return LoadStatsResponse.newBuilder() + .addClusters(CLUSTER_NAME) + .setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)).build(); + } + + private void assertNextReport(InOrder inOrder, StreamObserver requestObserver, + ClusterStats expectedStats) { + long loadReportIntervalNanos = Durations.toNanos(expectedStats.getLoadReportInterval()); + assertEquals(0, fakeClock.forwardTime(loadReportIntervalNanos - 1, TimeUnit.NANOSECONDS)); + inOrder.verifyNoMoreInteractions(); + assertEquals(1, fakeClock.forwardTime(1, TimeUnit.NANOSECONDS)); + // A second load report is scheduled upon the first is sent. + assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + inOrder.verify(loadStatsStore).generateLoadReport(); + ArgumentCaptor reportCaptor = ArgumentCaptor.forClass(null); + inOrder.verify(requestObserver).onNext(reportCaptor.capture()); + LoadStatsRequest report = reportCaptor.getValue(); + assertEquals(report.getNode(), Node.newBuilder() + .setMetadata(Struct.newBuilder() + .putFields( + LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD, + Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())) + .build()); + assertEquals(1, report.getClusterStatsCount()); + assertThat(report.getClusterStats(0)).isEqualTo(expectedStats); + } } diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java similarity index 91% rename from xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java rename to xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java index a18763c783..93dc5bb67b 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadStatsStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadStatsStoreImplTest.java @@ -40,22 +40,22 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Unit tests for {@link XdsLoadStatsStore}. */ +/** Unit tests for {@link LoadStatsStore}. */ @RunWith(JUnit4.class) -public class XdsLoadStatsStoreTest { +public class LoadStatsStoreImplTest { private static final XdsLocality LOCALITY1 = new XdsLocality("test_region1", "test_zone", "test_subzone"); private static final XdsLocality LOCALITY2 = new XdsLocality("test_region2", "test_zone", "test_subzone"); private ConcurrentMap localityLoadCounters; private ConcurrentMap dropCounters; - private XdsLoadStatsStore loadStore; + private LoadStatsStore loadStatsStore; @Before public void setUp() { localityLoadCounters = new ConcurrentHashMap<>(); dropCounters = new ConcurrentHashMap<>(); - loadStore = new XdsLoadStatsStore(localityLoadCounters, dropCounters); + loadStatsStore = new LoadStatsStoreImpl(localityLoadCounters, dropCounters); } private static List buildEndpointLoadMetricStatsList( @@ -155,25 +155,25 @@ public class XdsLoadStatsStoreTest { @Test public void addAndGetAndRemoveLocality() { - loadStore.addLocality(LOCALITY1); + loadStatsStore.addLocality(LOCALITY1); assertThat(localityLoadCounters).containsKey(LOCALITY1); // Adding the same locality counter again causes an exception. try { - loadStore.addLocality(LOCALITY1); + loadStatsStore.addLocality(LOCALITY1); Assert.fail(); } catch (IllegalStateException expected) { assertThat(expected).hasMessageThat() .contains("An active counter for locality " + LOCALITY1 + " already exists"); } - assertThat(loadStore.getLocalityCounter(LOCALITY1)) + assertThat(loadStatsStore.getLocalityCounter(LOCALITY1)) .isSameInstanceAs(localityLoadCounters.get(LOCALITY1)); - assertThat(loadStore.getLocalityCounter(LOCALITY2)).isNull(); + assertThat(loadStatsStore.getLocalityCounter(LOCALITY2)).isNull(); // Removing an non-existing locality counter causes an exception. try { - loadStore.removeLocality(LOCALITY2); + loadStatsStore.removeLocality(LOCALITY2); Assert.fail(); } catch (IllegalStateException expected) { assertThat(expected).hasMessageThat() @@ -181,12 +181,12 @@ public class XdsLoadStatsStoreTest { } // Removing the locality counter only mark it as inactive, but not throw it away. - loadStore.removeLocality(LOCALITY1); + loadStatsStore.removeLocality(LOCALITY1); assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isFalse(); // Removing an inactive locality counter causes an exception. try { - loadStore.removeLocality(LOCALITY1); + loadStatsStore.removeLocality(LOCALITY1); Assert.fail(); } catch (IllegalStateException expected) { assertThat(expected).hasMessageThat() @@ -194,7 +194,7 @@ public class XdsLoadStatsStoreTest { } // Adding it back simply mark it as active again. - loadStore.addLocality(LOCALITY1); + loadStatsStore.addLocality(LOCALITY1); assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isTrue(); } @@ -204,7 +204,7 @@ public class XdsLoadStatsStoreTest { ClientLoadCounter inactiveCounter = new ClientLoadCounter(); inactiveCounter.setActive(false); localityLoadCounters.put(LOCALITY2, inactiveCounter); - loadStore.generateLoadReport(); + loadStatsStore.generateLoadReport(); assertThat(localityLoadCounters).containsKey(LOCALITY1); assertThat(localityLoadCounters).doesNotContainKey(LOCALITY2); } @@ -241,7 +241,7 @@ public class XdsLoadStatsStoreTest { buildEndpointLoadMetricStatsList(metrics2)) ), null); - assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport()); + assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); expectedReport = buildClusterStats( @@ -250,7 +250,7 @@ public class XdsLoadStatsStoreTest { buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null) ), null); - assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport()); + assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport()); } @Test @@ -258,10 +258,10 @@ public class XdsLoadStatsStoreTest { int numLbDrop = 123; int numThrottleDrop = 456; for (int i = 0; i < numLbDrop; i++) { - loadStore.recordDroppedRequest("lb"); + loadStatsStore.recordDroppedRequest("lb"); } for (int i = 0; i < numThrottleDrop; i++) { - loadStore.recordDroppedRequest("throttle"); + loadStatsStore.recordDroppedRequest("throttle"); } assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop); assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop); @@ -269,7 +269,7 @@ public class XdsLoadStatsStoreTest { buildClusterStats(null, Arrays.asList(buildDroppedRequests("lb", numLbDrop), buildDroppedRequests("throttle", numThrottleDrop))); - assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport()); + assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport()); assertThat(dropCounters.get("lb").get()).isEqualTo(0); assertThat(dropCounters.get("throttle").get()).isEqualTo(0); } diff --git a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java index e4051a7058..d300fd2ea6 100644 --- a/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java +++ b/xds/src/test/java/io/grpc/xds/LocalityStoreTest.java @@ -193,7 +193,8 @@ public class LocalityStoreTest { @Mock private OrcaOobUtil orcaOobUtil; private final FakeLoadStatsStore fakeLoadStatsStore = new FakeLoadStatsStore(); - private final StatsStore statsStore = mock(StatsStore.class, delegatesTo(fakeLoadStatsStore)); + private final LoadStatsStore loadStatsStore = + mock(LoadStatsStore.class, delegatesTo(fakeLoadStatsStore)); private LocalityStore localityStore; @@ -216,7 +217,7 @@ public class LocalityStoreTest { }); lbRegistry.register(lbProvider); localityStore = - new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random, statsStore, + new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random, loadStatsStore, orcaPerRequestUtil, orcaOobUtil); } @@ -229,24 +230,24 @@ public class LocalityStoreTest { localityInfoMap .put(locality2, new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2)); localityStore.updateLocalityStore(localityInfoMap); - verify(statsStore).addLocality(locality1); - verify(statsStore).addLocality(locality2); + verify(loadStatsStore).addLocality(locality1); + verify(loadStatsStore).addLocality(locality2); localityInfoMap .put(locality3, new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3)); localityStore.updateLocalityStore(localityInfoMap); - verify(statsStore).addLocality(locality3); + verify(loadStatsStore).addLocality(locality3); localityInfoMap = ImmutableMap .of(locality4, new LocalityInfo(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4)); localityStore.updateLocalityStore(localityInfoMap); - verify(statsStore).removeLocality(locality1); - verify(statsStore).removeLocality(locality2); - verify(statsStore).removeLocality(locality3); - verify(statsStore).addLocality(locality4); + verify(loadStatsStore).removeLocality(locality1); + verify(loadStatsStore).removeLocality(locality2); + verify(loadStatsStore).removeLocality(locality3); + verify(loadStatsStore).addLocality(locality4); localityStore.updateLocalityStore(Collections.EMPTY_MAP); - verify(statsStore).removeLocality(locality4); + verify(loadStatsStore).removeLocality(locality4); } @Test @@ -532,30 +533,30 @@ public class LocalityStoreTest { verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture()); int times = 0; - InOrder inOrder = inOrder(statsStore); + InOrder inOrder = inOrder(loadStatsStore); doReturn(365, 1234).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs)) .isEqualTo(PickResult.withNoResult()); verify(random, times(times += 2)).nextInt(1000_000); - inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); + inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString()); doReturn(366, 1235).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs)) .isEqualTo(PickResult.withNoResult()); verify(random, times(times += 2)).nextInt(1000_000); - inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); + inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString()); doReturn(364, 1234).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) .isTrue(); verify(random, times(times += 1)).nextInt(1000_000); - inOrder.verify(statsStore).recordDroppedRequest(eq("throttle")); + inOrder.verify(loadStatsStore).recordDroppedRequest(eq("throttle")); doReturn(365, 1233).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) .isTrue(); verify(random, times(times += 2)).nextInt(1000_000); - inOrder.verify(statsStore).recordDroppedRequest(eq("lb")); + inOrder.verify(loadStatsStore).recordDroppedRequest(eq("lb")); // subchannel12 goes to READY CreateSubchannelArgs createSubchannelArgs = @@ -577,25 +578,25 @@ public class LocalityStoreTest { assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs) .getSubchannel()).isEqualTo(subchannel12); verify(random, times(times += 2)).nextInt(1000_000); - inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); + inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString()); doReturn(366, 1235).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs) .getSubchannel()).isEqualTo(subchannel12); verify(random, times(times += 2)).nextInt(1000_000); - inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); + inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString()); doReturn(364, 1234).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) .isTrue(); verify(random, times(times += 1)).nextInt(1000_000); - inOrder.verify(statsStore).recordDroppedRequest(eq("throttle")); + inOrder.verify(loadStatsStore).recordDroppedRequest(eq("throttle")); doReturn(365, 1233).when(random).nextInt(1000_000); assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) .isTrue(); verify(random, times(times + 2)).nextInt(1000_000); - inOrder.verify(statsStore).recordDroppedRequest(eq("lb")); + inOrder.verify(loadStatsStore).recordDroppedRequest(eq("lb")); inOrder.verifyNoMoreInteractions(); } @@ -639,11 +640,11 @@ public class LocalityStoreTest { verify(loadBalancers.get("sz1")).shutdown(); verify(loadBalancers.get("sz2")).shutdown(); - verify(statsStore).removeLocality(locality1); - verify(statsStore).removeLocality(locality2); + verify(loadStatsStore).removeLocality(locality1); + verify(loadStatsStore).removeLocality(locality2); } - private static final class FakeLoadStatsStore implements StatsStore { + private static final class FakeLoadStatsStore implements LoadStatsStore { Map localityCounters = new HashMap<>(); diff --git a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerWithLrsTest.java b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerWithLrsTest.java index f241209389..82e1668b25 100644 --- a/xds/src/test/java/io/grpc/xds/XdsLoadBalancerWithLrsTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsLoadBalancerWithLrsTest.java @@ -54,9 +54,9 @@ import io.grpc.internal.JsonParser; import io.grpc.internal.testing.StreamRecorder; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.LoadReportClient.LoadReportCallback; +import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory; import io.grpc.xds.XdsLoadBalancer.FallbackManager; -import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback; -import io.grpc.xds.XdsLoadReportClientImpl.XdsLoadReportClientFactory; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -73,7 +73,7 @@ import org.mockito.MockitoAnnotations; /** * Unit tests for {@link XdsLoadBalancer}, especially for interactions between - * {@link XdsLoadBalancer} and {@link XdsLoadReportClient}. + * {@link XdsLoadBalancer} and {@link LoadReportClient}. */ @RunWith(JUnit4.class) public class XdsLoadBalancerWithLrsTest { @@ -97,11 +97,11 @@ public class XdsLoadBalancerWithLrsTest { @Mock private LocalityStore localityStore; @Mock - private XdsLoadReportClientFactory lrsClientFactory; + private LoadReportClientFactory lrsClientFactory; @Mock - private XdsLoadReportClient lrsClient; + private LoadReportClient lrsClient; @Mock - private StatsStore statsStore; + private LoadStatsStore loadStatsStore; @Mock private LoadBalancer fallbackBalancer; @Mock @@ -219,9 +219,9 @@ public class XdsLoadBalancerWithLrsTest { when(helper.getChannelLogger()).thenReturn(mock(ChannelLogger.class)); when(helper.createResolvingOobChannel(anyString())) .thenReturn(oobChannel1, oobChannel2, oobChannel3); - when(localityStore.getStatsStore()).thenReturn(statsStore); + when(localityStore.getLoadStatsStore()).thenReturn(loadStatsStore); when(lrsClientFactory.createLoadReportClient(any(ManagedChannel.class), any(Helper.class), - any(BackoffPolicy.Provider.class), any(StatsStore.class))).thenReturn(lrsClient); + any(BackoffPolicy.Provider.class), any(LoadStatsStore.class))).thenReturn(lrsClient); xdsLoadBalancer = new XdsLoadBalancer(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory, @@ -247,7 +247,7 @@ public class XdsLoadBalancerWithLrsTest { verify(lrsClientFactory) .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), - same(statsStore)); + same(loadStatsStore)); assertThat(streamRecorder.getValues()).hasSize(1); // Let fallback timer elapse and xDS load balancer enters fallback mode on startup. @@ -256,13 +256,13 @@ public class XdsLoadBalancerWithLrsTest { fakeClock.forwardTime(10, TimeUnit.SECONDS); assertThat(fallBackLbHelper).isNotNull(); - verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); + verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class)); // Simulates a syntactically incorrect EDS response. serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); - verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); + verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class)); - ArgumentCaptor lrsCallbackCaptor = ArgumentCaptor.forClass(null); + ArgumentCaptor lrsCallbackCaptor = ArgumentCaptor.forClass(null); // Simulate a syntactically correct EDS response. DiscoveryResponse edsResponse = @@ -303,9 +303,9 @@ public class XdsLoadBalancerWithLrsTest { inOrder.verify(lrsClientFactory) .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), - same(statsStore)); + same(loadStatsStore)); assertThat(streamRecorder.getValues()).hasSize(1); - inOrder.verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); + inOrder.verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class)); // Simulate receiving a new service config with balancer name changed before xDS protocol is // established. @@ -323,7 +323,7 @@ public class XdsLoadBalancerWithLrsTest { assertThat(streamRecorder.getValues()).hasSize(2); inOrder.verify(lrsClientFactory) .createLoadReportClient(same(oobChannel2), same(helper), same(backoffPolicyProvider), - same(statsStore)); + same(loadStatsStore)); // Simulate a syntactically correct EDS response. DiscoveryResponse edsResponse = @@ -332,7 +332,7 @@ public class XdsLoadBalancerWithLrsTest { .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .build(); serverResponseWriter.onNext(edsResponse); - inOrder.verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); + inOrder.verify(lrsClient).startLoadReporting(any(LoadReportCallback.class)); // Simulate receiving a new service config with balancer name changed. newLbConfig = (Map) JsonParser.parse( @@ -349,10 +349,10 @@ public class XdsLoadBalancerWithLrsTest { inOrder.verify(lrsClient).stopLoadReporting(); inOrder.verify(lrsClientFactory) .createLoadReportClient(same(oobChannel3), same(helper), same(backoffPolicyProvider), - same(statsStore)); + same(loadStatsStore)); serverResponseWriter.onNext(edsResponse); - inOrder.verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); + inOrder.verify(lrsClient).startLoadReporting(any(LoadReportCallback.class)); inOrder.verifyNoMoreInteractions(); } @@ -370,7 +370,7 @@ public class XdsLoadBalancerWithLrsTest { verify(lrsClientFactory) .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), - same(statsStore)); + same(loadStatsStore)); assertThat(streamRecorder.getValues()).hasSize(1); // Simulate a syntactically correct EDS response. @@ -380,7 +380,7 @@ public class XdsLoadBalancerWithLrsTest { .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .build(); serverResponseWriter.onNext(edsResponse); - verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); + verify(lrsClient).startLoadReporting(any(LoadReportCallback.class)); // Simulate receiving a new service config with child policy changed. @SuppressWarnings("unchecked")