xds: clean up client load reporting code (#5928)

* Cleaned up XdsLoadStatsStore.
- Renamed the StatsStore interface to XdsLoadStatsStore and its corresponding implementation is XdsLoadStatsStoreImpl.
- Revised/reworded specification for XdsLoadStatsStore.

* Cleaned up ClientLoadCounter specification.

Reworded specification for ClientLoadCounter

* Cleaned up XdsLoadReportClient, reworded specifications, formatted tests.

* Removed Xds prefix from LoadStatsStore.

* Removed Xds prefix from XdsLoadReportClient.
This commit is contained in:
Chengyuan Zhang 2019-06-26 16:27:01 -07:00 committed by GitHub
parent f0a824bb53
commit 36476cb1f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 319 additions and 281 deletions

View File

@ -39,7 +39,8 @@ import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
/** /**
* Client side aggregator for load stats. * Client side load stats recorder that provides RPC counting and metrics recording as name-value
* pairs.
* *
* <p>All methods except {@link #snapshot()} in this class are thread-safe. * <p>All methods except {@link #snapshot()} in this class are thread-safe.
*/ */
@ -97,12 +98,14 @@ final class ClientLoadCounter {
} }
/** /**
* Generate snapshot for recorded query counts and metrics since previous snapshot. * Generates a snapshot for load stats recorded in this counter. Successive snapshots represent
* load stats recorded for the interval since the previous snapshot. So taking a snapshot clears
* the counter state except for ongoing RPC recordings.
* *
* <p>This method is not thread-safe and must be called from {@link * <p>This method is not thread-safe and must be called from {@link
* io.grpc.LoadBalancer.Helper#getSynchronizationContext()}. * io.grpc.LoadBalancer.Helper#getSynchronizationContext()}.
*/ */
public ClientLoadSnapshot snapshot() { ClientLoadSnapshot snapshot() {
Map<String, MetricValue> aggregatedValues = new HashMap<>(); Map<String, MetricValue> aggregatedValues = new HashMap<>();
for (MetricRecorder recorder : metricRecorders) { for (MetricRecorder recorder : metricRecorders) {
Map<String, MetricValue> map = recorder.takeAll(); Map<String, MetricValue> map = recorder.takeAll();
@ -133,8 +136,8 @@ final class ClientLoadCounter {
} }
/** /**
* A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter} to be sent as * A {@link ClientLoadSnapshot} represents a snapshot of {@link ClientLoadCounter}, which is a
* part of {@link io.envoyproxy.envoy.api.v2.endpoint.ClusterStats} to the balancer. * read-only copy of load stats recorded for some period of time.
*/ */
static final class ClientLoadSnapshot { static final class ClientLoadSnapshot {
@ -259,8 +262,8 @@ final class ClientLoadCounter {
} }
/** /**
* An {@link LoadRecordingStreamTracerFactory} instance records and aggregates client-side load * An {@link LoadRecordingStreamTracerFactory} instance for creating client stream tracers that
* data into an {@link ClientLoadCounter} object. * records and aggregates client-side load data into an {@link ClientLoadCounter} object.
*/ */
@ThreadSafe @ThreadSafe
@VisibleForTesting @VisibleForTesting

View File

@ -19,16 +19,21 @@ package io.grpc.xds;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
/** /**
* An {@link XdsLoadReportClient} is in charge of recording client side load stats, collecting * An {@link LoadReportClient} is the gRPC client's load reporting agent that establishes
* backend cost metrics and sending load reports to the remote balancer. It shares the same * connections to traffic director for reporting load stats from gRPC client's perspective.
* channel with {@link XdsLoadBalancer} and its lifecycle is managed by {@link XdsLoadBalancer}. *
* <p>Its operations should be self-contained and running independently along with xDS load
* balancer's load balancing protocol, although it shares the same channel to traffic director with
* xDS load balancer's load balancing protocol.
*
* <p>Its lifecycle is managed by the high-level xDS load balancer.
*/ */
@NotThreadSafe @NotThreadSafe
interface XdsLoadReportClient { interface LoadReportClient {
/** /**
* Establishes load reporting communication and negotiates with the remote balancer to report load * Establishes load reporting communication and negotiates with the remote balancer to report load
* stats periodically. Calling this method on an already started {@link XdsLoadReportClient} is * stats periodically. Calling this method on an already started {@link LoadReportClient} is
* no-op. * no-op.
* *
* <p>This method is not thread-safe and should be called from the same synchronized context * <p>This method is not thread-safe and should be called from the same synchronized context
@ -37,11 +42,11 @@ interface XdsLoadReportClient {
* @param callback containing methods to be invoked for passing information received from load * @param callback containing methods to be invoked for passing information received from load
* reporting responses to xDS load balancer. * reporting responses to xDS load balancer.
*/ */
void startLoadReporting(XdsLoadReportCallback callback); void startLoadReporting(LoadReportCallback callback);
/** /**
* Terminates load reporting. Calling this method on an already stopped * Terminates load reporting. Calling this method on an already stopped
* {@link XdsLoadReportClient} is no-op. * {@link LoadReportClient} is no-op.
* *
* <p>This method is not thread-safe and should be called from the same synchronized context * <p>This method is not thread-safe and should be called from the same synchronized context
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}. * returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
@ -55,7 +60,7 @@ interface XdsLoadReportClient {
* <p>Implementations are not required to be thread-safe as callbacks will be invoked in xDS load * <p>Implementations are not required to be thread-safe as callbacks will be invoked in xDS load
* balancer's {@link io.grpc.SynchronizationContext}. * balancer's {@link io.grpc.SynchronizationContext}.
*/ */
interface XdsLoadReportCallback { interface LoadReportCallback {
/** /**
* The load reporting interval has been received. * The load reporting interval has been received.

View File

@ -50,12 +50,13 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
/** /**
* Client of XDS load reporting service. Methods in this class are expected to be called in * Client of xDS load reporting service.
* the same synchronized context that {@link XdsLoadBalancer.Helper#getSynchronizationContext} *
* returns. * <p>Methods in this class are expected to be called in the same synchronized context that {@link
* XdsLoadBalancer.Helper#getSynchronizationContext} returns.
*/ */
@NotThreadSafe @NotThreadSafe
final class XdsLoadReportClientImpl implements XdsLoadReportClient { final class LoadReportClientImpl implements LoadReportClient {
@VisibleForTesting @VisibleForTesting
static final String TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD static final String TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD
@ -70,32 +71,31 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
private final Stopwatch retryStopwatch; private final Stopwatch retryStopwatch;
private final ChannelLogger logger; private final ChannelLogger logger;
private final BackoffPolicy.Provider backoffPolicyProvider; private final BackoffPolicy.Provider backoffPolicyProvider;
private final StatsStore statsStore; private final LoadStatsStore loadStatsStore;
private boolean started; private boolean started;
@Nullable @Nullable
private BackoffPolicy lrsRpcRetryPolicy; private BackoffPolicy lrsRpcRetryPolicy;
@Nullable @Nullable
private ScheduledHandle lrsRpcRetryTimer; private ScheduledHandle lrsRpcRetryTimer;
@Nullable @Nullable
private LrsStream lrsStream; private LrsStream lrsStream;
@Nullable @Nullable
private XdsLoadReportCallback callback; private LoadReportCallback callback;
XdsLoadReportClientImpl(ManagedChannel channel, private LoadReportClientImpl(ManagedChannel channel,
Helper helper, Helper helper,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
StatsStore statsStore) { LoadStatsStore loadStatsStore) {
this(channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, backoffPolicyProvider, statsStore); this(channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, backoffPolicyProvider, loadStatsStore);
} }
@VisibleForTesting @VisibleForTesting
XdsLoadReportClientImpl(ManagedChannel channel, LoadReportClientImpl(ManagedChannel channel,
Helper helper, Helper helper,
Supplier<Stopwatch> stopwatchSupplier, Supplier<Stopwatch> stopwatchSupplier,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
StatsStore statsStore) { LoadStatsStore loadStatsStore) {
this.channel = checkNotNull(channel, "channel"); this.channel = checkNotNull(channel, "channel");
this.serviceName = checkNotNull(helper.getAuthority(), "serviceName"); this.serviceName = checkNotNull(helper.getAuthority(), "serviceName");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
@ -104,12 +104,12 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
this.logger = checkNotNull(helper.getChannelLogger(), "logger"); this.logger = checkNotNull(helper.getChannelLogger(), "logger");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timeService"); this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.statsStore = checkNotNull(statsStore, "statsStore"); this.loadStatsStore = checkNotNull(loadStatsStore, "loadStatsStore");
started = false; started = false;
} }
@Override @Override
public void startLoadReporting(XdsLoadReportCallback callback) { public void startLoadReporting(LoadReportCallback callback) {
if (started) { if (started) {
return; return;
} }
@ -235,7 +235,7 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS); long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
reportStopwatch.reset().start(); reportStopwatch.reset().start();
ClusterStats report = ClusterStats report =
statsStore.generateLoadReport() loadStatsStore.generateLoadReport()
.toBuilder() .toBuilder()
.setClusterName(clusterName) .setClusterName(clusterName)
.setLoadReportInterval(Durations.fromNanos(interval)) .setLoadReportInterval(Durations.fromNanos(interval))
@ -346,25 +346,29 @@ final class XdsLoadReportClientImpl implements XdsLoadReportClient {
} }
} }
abstract static class XdsLoadReportClientFactory { /**
* Factory class for creating {@link LoadReportClient} instances.
*/
abstract static class LoadReportClientFactory {
private static final XdsLoadReportClientFactory DEFAULT_INSTANCE = private static final LoadReportClientFactory DEFAULT_INSTANCE =
new XdsLoadReportClientFactory() { new LoadReportClientFactory() {
@Override @Override
XdsLoadReportClient createLoadReportClient( LoadReportClient createLoadReportClient(
ManagedChannel channel, ManagedChannel channel,
Helper helper, Helper helper,
Provider backoffPolicyProvider, Provider backoffPolicyProvider,
StatsStore statsStore) { LoadStatsStore loadStatsStore) {
return new XdsLoadReportClientImpl(channel, helper, backoffPolicyProvider, statsStore); return new LoadReportClientImpl(channel, helper, backoffPolicyProvider,
loadStatsStore);
} }
}; };
static XdsLoadReportClientFactory getInstance() { static LoadReportClientFactory getInstance() {
return DEFAULT_INSTANCE; return DEFAULT_INSTANCE;
} }
abstract XdsLoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper, abstract LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper,
BackoffPolicy.Provider backoffPolicyProvider, StatsStore statsStore); BackoffPolicy.Provider backoffPolicyProvider, LoadStatsStore loadStatsStore);
} }
} }

View File

@ -0,0 +1,110 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import javax.annotation.Nullable;
/**
* Interface for client side load stats store. An {@code LoadStatsStore} maintains load stats for
* a service cluster (i.e., GSLB service) exposed by traffic director from a gRPC client's
* perspective, including dropped calls instructed by traffic director. Load stats for endpoints
* (i.e., Google backends) are aggregated in locality granularity (i.e., Google cluster) while the
* numbers of dropped calls are aggregated in cluster granularity.
*
* <p>An {@code LoadStatsStore} lives the same span of lifecycle as {@link XdsLoadBalancer} and
* only tracks loads for localities exposed by remote traffic director. A proper usage should be
*
* <ol>
* <li>Let {@link LoadStatsStore} track the locality newly exposed by traffic director by
* calling {@link #addLocality(XdsLocality)}.
* <li>Use the locality counter returned by {@link #getLocalityCounter(XdsLocality)} to record
* load stats for the corresponding locality.
* <li>Tell {@link LoadStatsStore} to stop tracking the locality no longer exposed by traffic
* director by calling {@link #removeLocality(XdsLocality)}.
* </ol>
*
* <p>No locality information is needed for recording dropped calls since they are aggregated in
* cluster granularity.
*
* <p>Note implementations should only be responsible for keeping track of loads and generating
* load reports with load data, any load reporting information should be opaque to {@code
* LoadStatsStore} and be set outside.
*/
interface LoadStatsStore {
/**
* Generates a {@link ClusterStats} proto message as the load report based on recorded load stats
* (including RPC * counts, backend metrics and dropped calls) for the interval since the previous
* call of this method.
*
* <p>Loads for localities no longer under tracking will not be included in generated load reports
* once all of theirs loads are completed and reported.
*
* <p>The fields {@code cluster_name} and {@code load_report_interval} in the returned {@link
* ClusterStats} needs to be set before it is ready to be sent to the traffic directory for load
* reporting.
*
* <p>This method is not thread-safe and should be called from the same synchronized context
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
*/
ClusterStats generateLoadReport();
/**
* Starts tracking load stats for endpoints in the provided locality. Only load stats for
* endpoints in added localities will be recorded and included in generated load reports.
*
* <p>This method needs to be called at locality updates only for newly assigned localities in
* balancer discovery responses before recording loads for those localities.
*
* <p>This method is not thread-safe and should be called from the same synchronized context
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
*/
void addLocality(XdsLocality locality);
/**
* Stops tracking load stats for endpoints in the provided locality. gRPC clients are expected not
* to send loads to localities no longer exposed by traffic director. Load stats for endpoints in
* removed localities will no longer be included in future generated load reports after their
* recorded and ongoing loads have been reported.
*
* <p>This method needs to be called at locality updates only for newly removed localities.
* Forgetting calling this method for localities no longer under track will result in memory
* waste and keep including zero-load upstream locality stats in generated load reports.
*
* <p>This method is not thread-safe and should be called from the same synchronized context
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
*/
void removeLocality(XdsLocality locality);
/**
* Returns the locality counter that does locality level stats aggregation for the provided
* locality. If the provided locality is not tracked, {@code null} will be returned.
*
* <p>This method is thread-safe.
*/
@Nullable
ClientLoadCounter getLocalityCounter(XdsLocality locality);
/**
* Records a drop decision made by a {@link io.grpc.LoadBalancer.SubchannelPicker} instance
* with the provided category. Drops are aggregated in cluster granularity.
*
* <p>This method is thread-safe.
*/
void recordDroppedRequest(String category);
}

View File

@ -33,31 +33,29 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
/** /**
* An {@link XdsLoadStatsStore} instance holds the client side load stats for a cluster. * An {@link LoadStatsStoreImpl} instance holds the load stats for a cluster from an gRPC
* client's perspective by maintaining a set of locality counters for each locality it is tracking
* loads for.
*/ */
@NotThreadSafe @NotThreadSafe
final class XdsLoadStatsStore implements StatsStore { final class LoadStatsStoreImpl implements LoadStatsStore {
private final ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters; private final ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters;
// Cluster level dropped request counts for each category specified in the DropOverload policy. // Cluster level dropped request counts for each category decision made by xDS load balancer.
private final ConcurrentMap<String, AtomicLong> dropCounters; private final ConcurrentMap<String, AtomicLong> dropCounters;
XdsLoadStatsStore() { LoadStatsStoreImpl() {
this(new ConcurrentHashMap<XdsLocality, ClientLoadCounter>(), this(new ConcurrentHashMap<XdsLocality, ClientLoadCounter>(),
new ConcurrentHashMap<String, AtomicLong>()); new ConcurrentHashMap<String, AtomicLong>());
} }
@VisibleForTesting @VisibleForTesting
XdsLoadStatsStore(ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters, LoadStatsStoreImpl(ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters,
ConcurrentMap<String, AtomicLong> dropCounters) { ConcurrentMap<String, AtomicLong> dropCounters) {
this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters"); this.localityLoadCounters = checkNotNull(localityLoadCounters, "localityLoadCounters");
this.dropCounters = checkNotNull(dropCounters, "dropCounters"); this.dropCounters = checkNotNull(dropCounters, "dropCounters");
} }
/**
* Generates a {@link ClusterStats} containing client side load stats and backend metrics
* (if any) in locality granularity.
*/
@Override @Override
public ClusterStats generateLoadReport() { public ClusterStats generateLoadReport() {
ClusterStats.Builder statsBuilder = ClusterStats.newBuilder(); ClusterStats.Builder statsBuilder = ClusterStats.newBuilder();
@ -96,10 +94,6 @@ final class XdsLoadStatsStore implements StatsStore {
return statsBuilder.build(); return statsBuilder.build();
} }
/**
* Create a {@link ClientLoadCounter} for the provided locality or make it active if already in
* this {@link XdsLoadStatsStore}.
*/
@Override @Override
public void addLocality(final XdsLocality locality) { public void addLocality(final XdsLocality locality) {
ClientLoadCounter counter = localityLoadCounters.get(locality); ClientLoadCounter counter = localityLoadCounters.get(locality);
@ -112,10 +106,6 @@ final class XdsLoadStatsStore implements StatsStore {
} }
} }
/**
* Deactivate the {@link ClientLoadCounter} for the provided locality in by this
* {@link XdsLoadStatsStore}.
*/
@Override @Override
public void removeLocality(final XdsLocality locality) { public void removeLocality(final XdsLocality locality) {
ClientLoadCounter counter = localityLoadCounters.get(locality); ClientLoadCounter counter = localityLoadCounters.get(locality);

View File

@ -77,7 +77,7 @@ interface LocalityStore {
void updateOobMetricsReportInterval(long reportIntervalNano); void updateOobMetricsReportInterval(long reportIntervalNano);
StatsStore getStatsStore(); LoadStatsStore getLoadStatsStore();
final class LocalityStoreImpl implements LocalityStore { final class LocalityStoreImpl implements LocalityStore {
private static final String ROUND_ROBIN = "round_robin"; private static final String ROUND_ROBIN = "round_robin";
@ -86,7 +86,7 @@ interface LocalityStore {
private final PickerFactory pickerFactory; private final PickerFactory pickerFactory;
private final LoadBalancerProvider loadBalancerProvider; private final LoadBalancerProvider loadBalancerProvider;
private final ThreadSafeRandom random; private final ThreadSafeRandom random;
private final StatsStore statsStore; private final LoadStatsStore loadStatsStore;
private final OrcaPerRequestUtil orcaPerRequestUtil; private final OrcaPerRequestUtil orcaPerRequestUtil;
private final OrcaOobUtil orcaOobUtil; private final OrcaOobUtil orcaOobUtil;
@ -96,7 +96,7 @@ interface LocalityStore {
LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) { LocalityStoreImpl(Helper helper, LoadBalancerRegistry lbRegistry) {
this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance, this(helper, pickerFactoryImpl, lbRegistry, ThreadSafeRandom.ThreadSafeRandomImpl.instance,
new XdsLoadStatsStore(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance()); new LoadStatsStoreImpl(), OrcaPerRequestUtil.getInstance(), OrcaOobUtil.getInstance());
} }
@VisibleForTesting @VisibleForTesting
@ -105,7 +105,7 @@ interface LocalityStore {
PickerFactory pickerFactory, PickerFactory pickerFactory,
LoadBalancerRegistry lbRegistry, LoadBalancerRegistry lbRegistry,
ThreadSafeRandom random, ThreadSafeRandom random,
StatsStore statsStore, LoadStatsStore loadStatsStore,
OrcaPerRequestUtil orcaPerRequestUtil, OrcaPerRequestUtil orcaPerRequestUtil,
OrcaOobUtil orcaOobUtil) { OrcaOobUtil orcaOobUtil) {
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
@ -114,7 +114,7 @@ interface LocalityStore {
lbRegistry.getProvider(ROUND_ROBIN), lbRegistry.getProvider(ROUND_ROBIN),
"Unable to find '%s' LoadBalancer", ROUND_ROBIN); "Unable to find '%s' LoadBalancer", ROUND_ROBIN);
this.random = checkNotNull(random, "random"); this.random = checkNotNull(random, "random");
this.statsStore = checkNotNull(statsStore, "statsStore"); this.loadStatsStore = checkNotNull(loadStatsStore, "loadStatsStore");
this.orcaPerRequestUtil = checkNotNull(orcaPerRequestUtil, "orcaPerRequestUtil"); this.orcaPerRequestUtil = checkNotNull(orcaPerRequestUtil, "orcaPerRequestUtil");
this.orcaOobUtil = checkNotNull(orcaOobUtil, "orcaOobUtil"); this.orcaOobUtil = checkNotNull(orcaOobUtil, "orcaOobUtil");
} }
@ -129,15 +129,15 @@ interface LocalityStore {
final ImmutableList<DropOverload> dropOverloads; final ImmutableList<DropOverload> dropOverloads;
final SubchannelPicker delegate; final SubchannelPicker delegate;
final ThreadSafeRandom random; final ThreadSafeRandom random;
final StatsStore statsStore; final LoadStatsStore loadStatsStore;
DroppablePicker( DroppablePicker(
ImmutableList<DropOverload> dropOverloads, SubchannelPicker delegate, ImmutableList<DropOverload> dropOverloads, SubchannelPicker delegate,
ThreadSafeRandom random, StatsStore statsStore) { ThreadSafeRandom random, LoadStatsStore loadStatsStore) {
this.dropOverloads = dropOverloads; this.dropOverloads = dropOverloads;
this.delegate = delegate; this.delegate = delegate;
this.random = random; this.random = random;
this.statsStore = statsStore; this.loadStatsStore = loadStatsStore;
} }
@Override @Override
@ -145,7 +145,7 @@ interface LocalityStore {
for (DropOverload dropOverload : dropOverloads) { for (DropOverload dropOverload : dropOverloads) {
int rand = random.nextInt(1000_000); int rand = random.nextInt(1000_000);
if (rand < dropOverload.dropsPerMillion) { if (rand < dropOverload.dropsPerMillion) {
statsStore.recordDroppedRequest(dropOverload.category); loadStatsStore.recordDroppedRequest(dropOverload.category);
return PickResult.withDrop(Status.UNAVAILABLE.withDescription( return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
"dropped by loadbalancer: " + dropOverload.toString())); "dropped by loadbalancer: " + dropOverload.toString()));
} }
@ -184,7 +184,7 @@ interface LocalityStore {
public void reset() { public void reset() {
for (XdsLocality locality : localityMap.keySet()) { for (XdsLocality locality : localityMap.keySet()) {
localityMap.get(locality).shutdown(); localityMap.get(locality).shutdown();
statsStore.removeLocality(locality); loadStatsStore.removeLocality(locality);
} }
localityMap = ImmutableMap.of(); localityMap = ImmutableMap.of();
} }
@ -221,9 +221,10 @@ interface LocalityStore {
oldLocalityLbInfo.childBalancer, oldLocalityLbInfo.childBalancer,
childHelper); childHelper);
} else { } else {
statsStore.addLocality(newLocality); loadStatsStore.addLocality(newLocality);
childHelper = childHelper =
new ChildHelper(newLocality, statsStore.getLocalityCounter(newLocality), orcaOobUtil); new ChildHelper(newLocality, loadStatsStore.getLocalityCounter(newLocality),
orcaOobUtil);
localityLbInfo = localityLbInfo =
new LocalityLbInfo( new LocalityLbInfo(
localityInfoMap.get(newLocality).localityWeight, localityInfoMap.get(newLocality).localityWeight,
@ -260,7 +261,7 @@ interface LocalityStore {
@Override @Override
public void run() { public void run() {
for (XdsLocality locality : toRemove) { for (XdsLocality locality : toRemove) {
statsStore.removeLocality(locality); loadStatsStore.removeLocality(locality);
} }
} }
}); });
@ -272,8 +273,8 @@ interface LocalityStore {
} }
@Override @Override
public StatsStore getStatsStore() { public LoadStatsStore getLoadStatsStore() {
return statsStore; return loadStatsStore;
} }
@Override @Override
@ -348,7 +349,7 @@ interface LocalityStore {
} }
if (!dropOverloads.isEmpty()) { if (!dropOverloads.isEmpty()) {
picker = new DroppablePicker(dropOverloads, picker, random, statsStore); picker = new DroppablePicker(dropOverloads, picker, random, loadStatsStore);
if (state == null) { if (state == null) {
state = IDLE; state = IDLE;
} }

View File

@ -1,78 +0,0 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import javax.annotation.Nullable;
/**
* Interface for client side load stats store. A {@code StatsStore} implementation should only be
* responsible for keeping track of load data aggregation, any load reporting information should
* be opaque to {@code StatsStore} and be set outside.
*/
interface StatsStore {
/**
* Generates a {@link ClusterStats} containing load stats and backend metrics in locality
* granularity, as well service level drop stats for the interval since the previous call of
* this method. The fields cluster_name and load_report_interval in the returned
* {@link ClusterStats} needs to be set before it is ready to be sent to the traffic directory
* for load reporting.
*
* <p>This method should be called in the same synchronized context that
* {@link XdsLoadBalancer.Helper#getSynchronizationContext} returns.
*/
ClusterStats generateLoadReport();
/**
* Tracks load stats for endpoints in the provided locality. To be called upon balancer locality
* updates only for newly assigned localities. Only load stats for endpoints in added localities
* will be reported to the remote balancer. This method needs to be called at locality updates
* only for newly assigned localities in balancer discovery responses.
*
* <p>This method is not thread-safe and should be called from the same synchronized context
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
*/
void addLocality(XdsLocality locality);
/**
* Stops tracking load stats for endpoints in the provided locality. To be called upon balancer
* locality updates only for newly removed localities. Load stats for endpoints in removed
* localities will no longer be reported to the remote balancer when client stop sending loads
* to them.
*
* <p>This method is not thread-safe and should be called from the same synchronized context *
* returned by {@link XdsLoadBalancer.Helper#getSynchronizationContext}.
*/
void removeLocality(XdsLocality locality);
/**
* Returns the {@link ClientLoadCounter} that does locality level stats aggregation for the
* provided locality. If the provided locality is not tracked, {@code null} will be returned.
*
* <p>This method is thread-safe.
*/
@Nullable
ClientLoadCounter getLocalityCounter(XdsLocality locality);
/**
* Records a drop decision made by a {@link io.grpc.LoadBalancer.SubchannelPicker} instance
* with the provided category. Drops are aggregated in service level.
*
* <p>This method is thread-safe.
*/
void recordDroppedRequest(String category);
}

View File

@ -43,10 +43,10 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.internal.ServiceConfigUtil.LbConfig;
import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.LocalityStore.LocalityStoreImpl; import io.grpc.xds.LocalityStore.LocalityStoreImpl;
import io.grpc.xds.XdsComms.AdsStreamCallback; import io.grpc.xds.XdsComms.AdsStreamCallback;
import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback;
import io.grpc.xds.XdsLoadReportClientImpl.XdsLoadReportClientFactory;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -65,10 +65,10 @@ final class XdsLoadBalancer extends LoadBalancer {
private final LoadBalancerRegistry lbRegistry; private final LoadBalancerRegistry lbRegistry;
private final FallbackManager fallbackManager; private final FallbackManager fallbackManager;
private final BackoffPolicy.Provider backoffPolicyProvider; private final BackoffPolicy.Provider backoffPolicyProvider;
private final XdsLoadReportClientFactory lrsClientFactory; private final LoadReportClientFactory lrsClientFactory;
@Nullable @Nullable
private XdsLoadReportClient lrsClient; private LoadReportClient lrsClient;
@Nullable @Nullable
private XdsLbState xdsLbState; private XdsLbState xdsLbState;
private final AdsStreamCallback adsStreamCallback = new AdsStreamCallback() { private final AdsStreamCallback adsStreamCallback = new AdsStreamCallback() {
@ -100,8 +100,8 @@ final class XdsLoadBalancer extends LoadBalancer {
} }
}; };
private final XdsLoadReportCallback lrsCallback = private final LoadReportCallback lrsCallback =
new XdsLoadReportCallback() { new LoadReportCallback() {
@Override @Override
public void onReportResponse(long reportIntervalNano) { public void onReportResponse(long reportIntervalNano) {
@ -113,14 +113,14 @@ final class XdsLoadBalancer extends LoadBalancer {
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry,
BackoffPolicy.Provider backoffPolicyProvider) { BackoffPolicy.Provider backoffPolicyProvider) {
this(helper, lbRegistry, backoffPolicyProvider, XdsLoadReportClientFactory.getInstance(), this(helper, lbRegistry, backoffPolicyProvider, LoadReportClientFactory.getInstance(),
new FallbackManager(helper, lbRegistry)); new FallbackManager(helper, lbRegistry));
} }
private XdsLoadBalancer(Helper helper, private XdsLoadBalancer(Helper helper,
LoadBalancerRegistry lbRegistry, LoadBalancerRegistry lbRegistry,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
XdsLoadReportClientFactory lrsClientFactory, LoadReportClientFactory lrsClientFactory,
FallbackManager fallbackManager) { FallbackManager fallbackManager) {
this(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory, fallbackManager, this(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory, fallbackManager,
new LocalityStoreImpl(new LocalityStoreHelper(helper, fallbackManager), lbRegistry)); new LocalityStoreImpl(new LocalityStoreHelper(helper, fallbackManager), lbRegistry));
@ -130,7 +130,7 @@ final class XdsLoadBalancer extends LoadBalancer {
XdsLoadBalancer(Helper helper, XdsLoadBalancer(Helper helper,
LoadBalancerRegistry lbRegistry, LoadBalancerRegistry lbRegistry,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
XdsLoadReportClientFactory lrsClientFactory, LoadReportClientFactory lrsClientFactory,
FallbackManager fallbackManager, FallbackManager fallbackManager,
LocalityStore localityStore) { LocalityStore localityStore) {
this.helper = checkNotNull(helper, "helper"); this.helper = checkNotNull(helper, "helper");
@ -203,7 +203,7 @@ final class XdsLoadBalancer extends LoadBalancer {
lbChannel = initLbChannel(helper, newBalancerName); lbChannel = initLbChannel(helper, newBalancerName);
lrsClient = lrsClient =
lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider, lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider,
localityStore.getStatsStore()); localityStore.getLoadStatsStore());
} else if (!newBalancerName.equals(xdsLbState.balancerName)) { } else if (!newBalancerName.equals(xdsLbState.balancerName)) {
lrsClient.stopLoadReporting(); lrsClient.stopLoadReporting();
ManagedChannel oldChannel = ManagedChannel oldChannel =
@ -214,7 +214,7 @@ final class XdsLoadBalancer extends LoadBalancer {
lbChannel = initLbChannel(helper, newBalancerName); lbChannel = initLbChannel(helper, newBalancerName);
lrsClient = lrsClient =
lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider, lrsClientFactory.createLoadReportClient(lbChannel, helper, backoffPolicyProvider,
localityStore.getStatsStore()); localityStore.getLoadStatsStore());
} else if (!Objects.equal( } else if (!Objects.equal(
getPolicyNameOrNull(childPolicy), getPolicyNameOrNull(childPolicy),
getPolicyNameOrNull(xdsLbState.childPolicy))) { getPolicyNameOrNull(xdsLbState.childPolicy))) {

View File

@ -53,7 +53,7 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback; import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
@ -73,10 +73,10 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
/** /**
* Unit tests for {@link XdsLoadReportClientImpl}. * Unit tests for {@link LoadReportClientImpl}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class XdsLoadReportClientImplTest { public class LoadReportClientImplTest {
private static final String SERVICE_AUTHORITY = "api.google.com"; private static final String SERVICE_AUTHORITY = "api.google.com";
private static final String CLUSTER_NAME = "gslb-namespace:gslb-service-name"; private static final String CLUSTER_NAME = "gslb-namespace:gslb-service-name";
@ -85,7 +85,7 @@ public class XdsLoadReportClientImplTest {
@Override @Override
public boolean shouldAccept(Runnable command) { public boolean shouldAccept(Runnable command) {
return command.toString() return command.toString()
.contains(XdsLoadReportClientImpl.LoadReportingTask.class.getSimpleName()); .contains(LoadReportClientImpl.LoadReportingTask.class.getSimpleName());
} }
}; };
private static final FakeClock.TaskFilter LRS_RPC_RETRY_TASK_FILTER = private static final FakeClock.TaskFilter LRS_RPC_RETRY_TASK_FILTER =
@ -93,7 +93,7 @@ public class XdsLoadReportClientImplTest {
@Override @Override
public boolean shouldAccept(Runnable command) { public boolean shouldAccept(Runnable command) {
return command.toString() return command.toString()
.contains(XdsLoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName()); .contains(LoadReportClientImpl.LrsRpcRetryTask.class.getSimpleName());
} }
}; };
private static final Locality TEST_LOCALITY = private static final Locality TEST_LOCALITY =
@ -102,6 +102,14 @@ public class XdsLoadReportClientImplTest {
.setZone("test_zone") .setZone("test_zone")
.setSubZone("test_subzone") .setSubZone("test_subzone")
.build(); .build();
private static final LoadStatsRequest EXPECTED_INITIAL_REQ = LoadStatsRequest.newBuilder()
.setNode(Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())))
.build();
@Rule @Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final SynchronizationContext syncContext = new SynchronizationContext( private final SynchronizationContext syncContext = new SynchronizationContext(
@ -123,46 +131,28 @@ public class XdsLoadReportClientImplTest {
log(level, MessageFormat.format(template, args)); log(level, MessageFormat.format(template, args));
} }
}; };
private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService;
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();
private final ArrayDeque<StreamObserver<LoadStatsRequest>> lrsRequestObservers = private final ArrayDeque<StreamObserver<LoadStatsRequest>> lrsRequestObservers =
new ArrayDeque<>(); new ArrayDeque<>();
@Captor
private ArgumentCaptor<StreamObserver<LoadStatsResponse>> lrsResponseObserverCaptor;
@Mock @Mock
private Helper helper; private Helper helper;
@Mock @Mock
private BackoffPolicy.Provider backoffPolicyProvider; private BackoffPolicy.Provider backoffPolicyProvider;
private static final LoadStatsRequest EXPECTED_INITIAL_REQ = LoadStatsRequest.newBuilder()
.setNode(Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())))
.build();
@Mock @Mock
private BackoffPolicy backoffPolicy1; private BackoffPolicy backoffPolicy1;
private ManagedChannel channel;
private XdsLoadReportClientImpl lrsClient;
@Mock @Mock
private BackoffPolicy backoffPolicy2; private BackoffPolicy backoffPolicy2;
@Mock @Mock
private StatsStore statsStore; private LoadStatsStore loadStatsStore;
@Mock @Mock
private XdsLoadReportCallback callback; private LoadReportCallback callback;
@Captor
private ArgumentCaptor<StreamObserver<LoadStatsResponse>> lrsResponseObserverCaptor;
private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) { private LoadReportingServiceGrpc.LoadReportingServiceImplBase mockLoadReportingService;
return ClusterStats.newBuilder() private ManagedChannel channel;
.setClusterName(CLUSTER_NAME) private LoadReportClientImpl lrsClient;
.setLoadReportInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
private static LoadStatsResponse buildLrsResponse(long loadReportIntervalNanos) {
return LoadStatsResponse.newBuilder()
.addClusters(CLUSTER_NAME)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Before @Before
@ -203,9 +193,9 @@ public class XdsLoadReportClientImplTest {
when(backoffPolicy2.nextBackoffNanos()) when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L)); .thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
lrsClient = lrsClient =
new XdsLoadReportClientImpl(channel, helper, fakeClock.getStopwatchSupplier(), new LoadReportClientImpl(channel, helper, fakeClock.getStopwatchSupplier(),
backoffPolicyProvider, backoffPolicyProvider,
statsStore); loadStatsStore);
lrsClient.startLoadReporting(callback); lrsClient.startLoadReporting(callback);
} }
@ -214,28 +204,6 @@ public class XdsLoadReportClientImplTest {
lrsClient.stopLoadReporting(); lrsClient.stopLoadReporting();
} }
private void assertNextReport(InOrder inOrder, StreamObserver<LoadStatsRequest> requestObserver,
ClusterStats expectedStats) {
long loadReportIntervalNanos = Durations.toNanos(expectedStats.getLoadReportInterval());
assertEquals(0, fakeClock.forwardTime(loadReportIntervalNanos - 1, TimeUnit.NANOSECONDS));
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.forwardTime(1, TimeUnit.NANOSECONDS));
// A second load report is scheduled upon the first is sent.
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
inOrder.verify(statsStore).generateLoadReport();
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
inOrder.verify(requestObserver).onNext(reportCaptor.capture());
LoadStatsRequest report = reportCaptor.getValue();
assertEquals(report.getNode(), Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
XdsLoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))
.build());
assertEquals(1, report.getClusterStatsCount());
assertThat(report.getClusterStats(0)).isEqualTo(expectedStats);
}
@Test @Test
public void loadReportInitialRequest() { public void loadReportInitialRequest() {
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture()); verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
@ -277,8 +245,8 @@ public class XdsLoadReportClientImplTest {
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue(); StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1); assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll(); StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
when(statsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
InOrder inOrder = inOrder(requestObserver, statsStore); InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ); assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll(); logs.poll();
@ -297,9 +265,9 @@ public class XdsLoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1); assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll(); StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
when(statsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build()); when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
InOrder inOrder = inOrder(requestObserver, statsStore); InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ); assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll(); logs.poll();
@ -325,7 +293,7 @@ public class XdsLoadReportClientImplTest {
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue(); StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1); assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll(); StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
InOrder inOrder = inOrder(requestObserver, statsStore); InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ); inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
@ -366,7 +334,7 @@ public class XdsLoadReportClientImplTest {
.setDroppedCount(0)) .setDroppedCount(0))
.setTotalDroppedRequests(0) .setTotalDroppedRequests(0)
.build(); .build();
when(statsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2); when(loadStatsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2);
responseObserver.onNext(buildLrsResponse(1362)); responseObserver.onNext(buildLrsResponse(1362));
assertNextReport(inOrder, requestObserver, expectedStats1); assertNextReport(inOrder, requestObserver, expectedStats1);
@ -519,4 +487,38 @@ public class XdsLoadReportClientImplTest {
inOrder.verify(requestObserver, never()).onNext(any(LoadStatsRequest.class)); inOrder.verify(requestObserver, never()).onNext(any(LoadStatsRequest.class));
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
} }
private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) {
return ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setLoadReportInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
private static LoadStatsResponse buildLrsResponse(long loadReportIntervalNanos) {
return LoadStatsResponse.newBuilder()
.addClusters(CLUSTER_NAME)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
private void assertNextReport(InOrder inOrder, StreamObserver<LoadStatsRequest> requestObserver,
ClusterStats expectedStats) {
long loadReportIntervalNanos = Durations.toNanos(expectedStats.getLoadReportInterval());
assertEquals(0, fakeClock.forwardTime(loadReportIntervalNanos - 1, TimeUnit.NANOSECONDS));
inOrder.verifyNoMoreInteractions();
assertEquals(1, fakeClock.forwardTime(1, TimeUnit.NANOSECONDS));
// A second load report is scheduled upon the first is sent.
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
inOrder.verify(loadStatsStore).generateLoadReport();
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
inOrder.verify(requestObserver).onNext(reportCaptor.capture());
LoadStatsRequest report = reportCaptor.getValue();
assertEquals(report.getNode(), Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))
.build());
assertEquals(1, report.getClusterStatsCount());
assertThat(report.getClusterStats(0)).isEqualTo(expectedStats);
}
} }

View File

@ -40,22 +40,22 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
/** Unit tests for {@link XdsLoadStatsStore}. */ /** Unit tests for {@link LoadStatsStore}. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class XdsLoadStatsStoreTest { public class LoadStatsStoreImplTest {
private static final XdsLocality LOCALITY1 = private static final XdsLocality LOCALITY1 =
new XdsLocality("test_region1", "test_zone", "test_subzone"); new XdsLocality("test_region1", "test_zone", "test_subzone");
private static final XdsLocality LOCALITY2 = private static final XdsLocality LOCALITY2 =
new XdsLocality("test_region2", "test_zone", "test_subzone"); new XdsLocality("test_region2", "test_zone", "test_subzone");
private ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters; private ConcurrentMap<XdsLocality, ClientLoadCounter> localityLoadCounters;
private ConcurrentMap<String, AtomicLong> dropCounters; private ConcurrentMap<String, AtomicLong> dropCounters;
private XdsLoadStatsStore loadStore; private LoadStatsStore loadStatsStore;
@Before @Before
public void setUp() { public void setUp() {
localityLoadCounters = new ConcurrentHashMap<>(); localityLoadCounters = new ConcurrentHashMap<>();
dropCounters = new ConcurrentHashMap<>(); dropCounters = new ConcurrentHashMap<>();
loadStore = new XdsLoadStatsStore(localityLoadCounters, dropCounters); loadStatsStore = new LoadStatsStoreImpl(localityLoadCounters, dropCounters);
} }
private static List<EndpointLoadMetricStats> buildEndpointLoadMetricStatsList( private static List<EndpointLoadMetricStats> buildEndpointLoadMetricStatsList(
@ -155,25 +155,25 @@ public class XdsLoadStatsStoreTest {
@Test @Test
public void addAndGetAndRemoveLocality() { public void addAndGetAndRemoveLocality() {
loadStore.addLocality(LOCALITY1); loadStatsStore.addLocality(LOCALITY1);
assertThat(localityLoadCounters).containsKey(LOCALITY1); assertThat(localityLoadCounters).containsKey(LOCALITY1);
// Adding the same locality counter again causes an exception. // Adding the same locality counter again causes an exception.
try { try {
loadStore.addLocality(LOCALITY1); loadStatsStore.addLocality(LOCALITY1);
Assert.fail(); Assert.fail();
} catch (IllegalStateException expected) { } catch (IllegalStateException expected) {
assertThat(expected).hasMessageThat() assertThat(expected).hasMessageThat()
.contains("An active counter for locality " + LOCALITY1 + " already exists"); .contains("An active counter for locality " + LOCALITY1 + " already exists");
} }
assertThat(loadStore.getLocalityCounter(LOCALITY1)) assertThat(loadStatsStore.getLocalityCounter(LOCALITY1))
.isSameInstanceAs(localityLoadCounters.get(LOCALITY1)); .isSameInstanceAs(localityLoadCounters.get(LOCALITY1));
assertThat(loadStore.getLocalityCounter(LOCALITY2)).isNull(); assertThat(loadStatsStore.getLocalityCounter(LOCALITY2)).isNull();
// Removing an non-existing locality counter causes an exception. // Removing an non-existing locality counter causes an exception.
try { try {
loadStore.removeLocality(LOCALITY2); loadStatsStore.removeLocality(LOCALITY2);
Assert.fail(); Assert.fail();
} catch (IllegalStateException expected) { } catch (IllegalStateException expected) {
assertThat(expected).hasMessageThat() assertThat(expected).hasMessageThat()
@ -181,12 +181,12 @@ public class XdsLoadStatsStoreTest {
} }
// Removing the locality counter only mark it as inactive, but not throw it away. // Removing the locality counter only mark it as inactive, but not throw it away.
loadStore.removeLocality(LOCALITY1); loadStatsStore.removeLocality(LOCALITY1);
assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isFalse(); assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isFalse();
// Removing an inactive locality counter causes an exception. // Removing an inactive locality counter causes an exception.
try { try {
loadStore.removeLocality(LOCALITY1); loadStatsStore.removeLocality(LOCALITY1);
Assert.fail(); Assert.fail();
} catch (IllegalStateException expected) { } catch (IllegalStateException expected) {
assertThat(expected).hasMessageThat() assertThat(expected).hasMessageThat()
@ -194,7 +194,7 @@ public class XdsLoadStatsStoreTest {
} }
// Adding it back simply mark it as active again. // Adding it back simply mark it as active again.
loadStore.addLocality(LOCALITY1); loadStatsStore.addLocality(LOCALITY1);
assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isTrue(); assertThat(localityLoadCounters.get(LOCALITY1).isActive()).isTrue();
} }
@ -204,7 +204,7 @@ public class XdsLoadStatsStoreTest {
ClientLoadCounter inactiveCounter = new ClientLoadCounter(); ClientLoadCounter inactiveCounter = new ClientLoadCounter();
inactiveCounter.setActive(false); inactiveCounter.setActive(false);
localityLoadCounters.put(LOCALITY2, inactiveCounter); localityLoadCounters.put(LOCALITY2, inactiveCounter);
loadStore.generateLoadReport(); loadStatsStore.generateLoadReport();
assertThat(localityLoadCounters).containsKey(LOCALITY1); assertThat(localityLoadCounters).containsKey(LOCALITY1);
assertThat(localityLoadCounters).doesNotContainKey(LOCALITY2); assertThat(localityLoadCounters).doesNotContainKey(LOCALITY2);
} }
@ -241,7 +241,7 @@ public class XdsLoadStatsStoreTest {
buildEndpointLoadMetricStatsList(metrics2)) buildEndpointLoadMetricStatsList(metrics2))
), ),
null); null);
assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport()); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport());
expectedReport = expectedReport =
buildClusterStats( buildClusterStats(
@ -250,7 +250,7 @@ public class XdsLoadStatsStoreTest {
buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null) buildUpstreamLocalityStats(LOCALITY2, 0, 432, 0, 0, null)
), ),
null); null);
assertClusterStatsEqual(expectedReport, loadStore.generateLoadReport()); assertClusterStatsEqual(expectedReport, loadStatsStore.generateLoadReport());
} }
@Test @Test
@ -258,10 +258,10 @@ public class XdsLoadStatsStoreTest {
int numLbDrop = 123; int numLbDrop = 123;
int numThrottleDrop = 456; int numThrottleDrop = 456;
for (int i = 0; i < numLbDrop; i++) { for (int i = 0; i < numLbDrop; i++) {
loadStore.recordDroppedRequest("lb"); loadStatsStore.recordDroppedRequest("lb");
} }
for (int i = 0; i < numThrottleDrop; i++) { for (int i = 0; i < numThrottleDrop; i++) {
loadStore.recordDroppedRequest("throttle"); loadStatsStore.recordDroppedRequest("throttle");
} }
assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop); assertThat(dropCounters.get("lb").get()).isEqualTo(numLbDrop);
assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop); assertThat(dropCounters.get("throttle").get()).isEqualTo(numThrottleDrop);
@ -269,7 +269,7 @@ public class XdsLoadStatsStoreTest {
buildClusterStats(null, buildClusterStats(null,
Arrays.asList(buildDroppedRequests("lb", numLbDrop), Arrays.asList(buildDroppedRequests("lb", numLbDrop),
buildDroppedRequests("throttle", numThrottleDrop))); buildDroppedRequests("throttle", numThrottleDrop)));
assertClusterStatsEqual(expectedLoadReport, loadStore.generateLoadReport()); assertClusterStatsEqual(expectedLoadReport, loadStatsStore.generateLoadReport());
assertThat(dropCounters.get("lb").get()).isEqualTo(0); assertThat(dropCounters.get("lb").get()).isEqualTo(0);
assertThat(dropCounters.get("throttle").get()).isEqualTo(0); assertThat(dropCounters.get("throttle").get()).isEqualTo(0);
} }

View File

@ -193,7 +193,8 @@ public class LocalityStoreTest {
@Mock @Mock
private OrcaOobUtil orcaOobUtil; private OrcaOobUtil orcaOobUtil;
private final FakeLoadStatsStore fakeLoadStatsStore = new FakeLoadStatsStore(); private final FakeLoadStatsStore fakeLoadStatsStore = new FakeLoadStatsStore();
private final StatsStore statsStore = mock(StatsStore.class, delegatesTo(fakeLoadStatsStore)); private final LoadStatsStore loadStatsStore =
mock(LoadStatsStore.class, delegatesTo(fakeLoadStatsStore));
private LocalityStore localityStore; private LocalityStore localityStore;
@ -216,7 +217,7 @@ public class LocalityStoreTest {
}); });
lbRegistry.register(lbProvider); lbRegistry.register(lbProvider);
localityStore = localityStore =
new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random, statsStore, new LocalityStoreImpl(helper, pickerFactory, lbRegistry, random, loadStatsStore,
orcaPerRequestUtil, orcaOobUtil); orcaPerRequestUtil, orcaOobUtil);
} }
@ -229,24 +230,24 @@ public class LocalityStoreTest {
localityInfoMap localityInfoMap
.put(locality2, new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2)); .put(locality2, new LocalityInfo(ImmutableList.of(lbEndpoint21, lbEndpoint22), 2));
localityStore.updateLocalityStore(localityInfoMap); localityStore.updateLocalityStore(localityInfoMap);
verify(statsStore).addLocality(locality1); verify(loadStatsStore).addLocality(locality1);
verify(statsStore).addLocality(locality2); verify(loadStatsStore).addLocality(locality2);
localityInfoMap localityInfoMap
.put(locality3, new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3)); .put(locality3, new LocalityInfo(ImmutableList.of(lbEndpoint31, lbEndpoint32), 3));
localityStore.updateLocalityStore(localityInfoMap); localityStore.updateLocalityStore(localityInfoMap);
verify(statsStore).addLocality(locality3); verify(loadStatsStore).addLocality(locality3);
localityInfoMap = ImmutableMap localityInfoMap = ImmutableMap
.of(locality4, new LocalityInfo(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4)); .of(locality4, new LocalityInfo(ImmutableList.of(lbEndpoint41, lbEndpoint42), 4));
localityStore.updateLocalityStore(localityInfoMap); localityStore.updateLocalityStore(localityInfoMap);
verify(statsStore).removeLocality(locality1); verify(loadStatsStore).removeLocality(locality1);
verify(statsStore).removeLocality(locality2); verify(loadStatsStore).removeLocality(locality2);
verify(statsStore).removeLocality(locality3); verify(loadStatsStore).removeLocality(locality3);
verify(statsStore).addLocality(locality4); verify(loadStatsStore).addLocality(locality4);
localityStore.updateLocalityStore(Collections.EMPTY_MAP); localityStore.updateLocalityStore(Collections.EMPTY_MAP);
verify(statsStore).removeLocality(locality4); verify(loadStatsStore).removeLocality(locality4);
} }
@Test @Test
@ -532,30 +533,30 @@ public class LocalityStoreTest {
verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture()); verify(helper).updateBalancingState(same(IDLE), subchannelPickerCaptor.capture());
int times = 0; int times = 0;
InOrder inOrder = inOrder(statsStore); InOrder inOrder = inOrder(loadStatsStore);
doReturn(365, 1234).when(random).nextInt(1000_000); doReturn(365, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs)) assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult()); .isEqualTo(PickResult.withNoResult());
verify(random, times(times += 2)).nextInt(1000_000); verify(random, times(times += 2)).nextInt(1000_000);
inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString());
doReturn(366, 1235).when(random).nextInt(1000_000); doReturn(366, 1235).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs)) assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs))
.isEqualTo(PickResult.withNoResult()); .isEqualTo(PickResult.withNoResult());
verify(random, times(times += 2)).nextInt(1000_000); verify(random, times(times += 2)).nextInt(1000_000);
inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString());
doReturn(364, 1234).when(random).nextInt(1000_000); doReturn(364, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue(); .isTrue();
verify(random, times(times += 1)).nextInt(1000_000); verify(random, times(times += 1)).nextInt(1000_000);
inOrder.verify(statsStore).recordDroppedRequest(eq("throttle")); inOrder.verify(loadStatsStore).recordDroppedRequest(eq("throttle"));
doReturn(365, 1233).when(random).nextInt(1000_000); doReturn(365, 1233).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) assertThat(subchannelPickerCaptor.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue(); .isTrue();
verify(random, times(times += 2)).nextInt(1000_000); verify(random, times(times += 2)).nextInt(1000_000);
inOrder.verify(statsStore).recordDroppedRequest(eq("lb")); inOrder.verify(loadStatsStore).recordDroppedRequest(eq("lb"));
// subchannel12 goes to READY // subchannel12 goes to READY
CreateSubchannelArgs createSubchannelArgs = CreateSubchannelArgs createSubchannelArgs =
@ -577,25 +578,25 @@ public class LocalityStoreTest {
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs) assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)
.getSubchannel()).isEqualTo(subchannel12); .getSubchannel()).isEqualTo(subchannel12);
verify(random, times(times += 2)).nextInt(1000_000); verify(random, times(times += 2)).nextInt(1000_000);
inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString());
doReturn(366, 1235).when(random).nextInt(1000_000); doReturn(366, 1235).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs) assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs)
.getSubchannel()).isEqualTo(subchannel12); .getSubchannel()).isEqualTo(subchannel12);
verify(random, times(times += 2)).nextInt(1000_000); verify(random, times(times += 2)).nextInt(1000_000);
inOrder.verify(statsStore, never()).recordDroppedRequest(anyString()); inOrder.verify(loadStatsStore, never()).recordDroppedRequest(anyString());
doReturn(364, 1234).when(random).nextInt(1000_000); doReturn(364, 1234).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue(); .isTrue();
verify(random, times(times += 1)).nextInt(1000_000); verify(random, times(times += 1)).nextInt(1000_000);
inOrder.verify(statsStore).recordDroppedRequest(eq("throttle")); inOrder.verify(loadStatsStore).recordDroppedRequest(eq("throttle"));
doReturn(365, 1233).when(random).nextInt(1000_000); doReturn(365, 1233).when(random).nextInt(1000_000);
assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop()) assertThat(subchannelPickerCaptor12.getValue().pickSubchannel(pickSubchannelArgs).isDrop())
.isTrue(); .isTrue();
verify(random, times(times + 2)).nextInt(1000_000); verify(random, times(times + 2)).nextInt(1000_000);
inOrder.verify(statsStore).recordDroppedRequest(eq("lb")); inOrder.verify(loadStatsStore).recordDroppedRequest(eq("lb"));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }
@ -639,11 +640,11 @@ public class LocalityStoreTest {
verify(loadBalancers.get("sz1")).shutdown(); verify(loadBalancers.get("sz1")).shutdown();
verify(loadBalancers.get("sz2")).shutdown(); verify(loadBalancers.get("sz2")).shutdown();
verify(statsStore).removeLocality(locality1); verify(loadStatsStore).removeLocality(locality1);
verify(statsStore).removeLocality(locality2); verify(loadStatsStore).removeLocality(locality2);
} }
private static final class FakeLoadStatsStore implements StatsStore { private static final class FakeLoadStatsStore implements LoadStatsStore {
Map<XdsLocality, ClientLoadCounter> localityCounters = new HashMap<>(); Map<XdsLocality, ClientLoadCounter> localityCounters = new HashMap<>();

View File

@ -54,9 +54,9 @@ import io.grpc.internal.JsonParser;
import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import io.grpc.xds.LoadReportClientImpl.LoadReportClientFactory;
import io.grpc.xds.XdsLoadBalancer.FallbackManager; import io.grpc.xds.XdsLoadBalancer.FallbackManager;
import io.grpc.xds.XdsLoadReportClient.XdsLoadReportCallback;
import io.grpc.xds.XdsLoadReportClientImpl.XdsLoadReportClientFactory;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -73,7 +73,7 @@ import org.mockito.MockitoAnnotations;
/** /**
* Unit tests for {@link XdsLoadBalancer}, especially for interactions between * Unit tests for {@link XdsLoadBalancer}, especially for interactions between
* {@link XdsLoadBalancer} and {@link XdsLoadReportClient}. * {@link XdsLoadBalancer} and {@link LoadReportClient}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class XdsLoadBalancerWithLrsTest { public class XdsLoadBalancerWithLrsTest {
@ -97,11 +97,11 @@ public class XdsLoadBalancerWithLrsTest {
@Mock @Mock
private LocalityStore localityStore; private LocalityStore localityStore;
@Mock @Mock
private XdsLoadReportClientFactory lrsClientFactory; private LoadReportClientFactory lrsClientFactory;
@Mock @Mock
private XdsLoadReportClient lrsClient; private LoadReportClient lrsClient;
@Mock @Mock
private StatsStore statsStore; private LoadStatsStore loadStatsStore;
@Mock @Mock
private LoadBalancer fallbackBalancer; private LoadBalancer fallbackBalancer;
@Mock @Mock
@ -219,9 +219,9 @@ public class XdsLoadBalancerWithLrsTest {
when(helper.getChannelLogger()).thenReturn(mock(ChannelLogger.class)); when(helper.getChannelLogger()).thenReturn(mock(ChannelLogger.class));
when(helper.createResolvingOobChannel(anyString())) when(helper.createResolvingOobChannel(anyString()))
.thenReturn(oobChannel1, oobChannel2, oobChannel3); .thenReturn(oobChannel1, oobChannel2, oobChannel3);
when(localityStore.getStatsStore()).thenReturn(statsStore); when(localityStore.getLoadStatsStore()).thenReturn(loadStatsStore);
when(lrsClientFactory.createLoadReportClient(any(ManagedChannel.class), any(Helper.class), when(lrsClientFactory.createLoadReportClient(any(ManagedChannel.class), any(Helper.class),
any(BackoffPolicy.Provider.class), any(StatsStore.class))).thenReturn(lrsClient); any(BackoffPolicy.Provider.class), any(LoadStatsStore.class))).thenReturn(lrsClient);
xdsLoadBalancer = xdsLoadBalancer =
new XdsLoadBalancer(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory, new XdsLoadBalancer(helper, lbRegistry, backoffPolicyProvider, lrsClientFactory,
@ -247,7 +247,7 @@ public class XdsLoadBalancerWithLrsTest {
verify(lrsClientFactory) verify(lrsClientFactory)
.createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider),
same(statsStore)); same(loadStatsStore));
assertThat(streamRecorder.getValues()).hasSize(1); assertThat(streamRecorder.getValues()).hasSize(1);
// Let fallback timer elapse and xDS load balancer enters fallback mode on startup. // Let fallback timer elapse and xDS load balancer enters fallback mode on startup.
@ -256,13 +256,13 @@ public class XdsLoadBalancerWithLrsTest {
fakeClock.forwardTime(10, TimeUnit.SECONDS); fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertThat(fallBackLbHelper).isNotNull(); assertThat(fallBackLbHelper).isNotNull();
verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class));
// Simulates a syntactically incorrect EDS response. // Simulates a syntactically incorrect EDS response.
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance()); serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class));
ArgumentCaptor<XdsLoadReportCallback> lrsCallbackCaptor = ArgumentCaptor.forClass(null); ArgumentCaptor<LoadReportCallback> lrsCallbackCaptor = ArgumentCaptor.forClass(null);
// Simulate a syntactically correct EDS response. // Simulate a syntactically correct EDS response.
DiscoveryResponse edsResponse = DiscoveryResponse edsResponse =
@ -303,9 +303,9 @@ public class XdsLoadBalancerWithLrsTest {
inOrder.verify(lrsClientFactory) inOrder.verify(lrsClientFactory)
.createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider),
same(statsStore)); same(loadStatsStore));
assertThat(streamRecorder.getValues()).hasSize(1); assertThat(streamRecorder.getValues()).hasSize(1);
inOrder.verify(lrsClient, never()).startLoadReporting(any(XdsLoadReportCallback.class)); inOrder.verify(lrsClient, never()).startLoadReporting(any(LoadReportCallback.class));
// Simulate receiving a new service config with balancer name changed before xDS protocol is // Simulate receiving a new service config with balancer name changed before xDS protocol is
// established. // established.
@ -323,7 +323,7 @@ public class XdsLoadBalancerWithLrsTest {
assertThat(streamRecorder.getValues()).hasSize(2); assertThat(streamRecorder.getValues()).hasSize(2);
inOrder.verify(lrsClientFactory) inOrder.verify(lrsClientFactory)
.createLoadReportClient(same(oobChannel2), same(helper), same(backoffPolicyProvider), .createLoadReportClient(same(oobChannel2), same(helper), same(backoffPolicyProvider),
same(statsStore)); same(loadStatsStore));
// Simulate a syntactically correct EDS response. // Simulate a syntactically correct EDS response.
DiscoveryResponse edsResponse = DiscoveryResponse edsResponse =
@ -332,7 +332,7 @@ public class XdsLoadBalancerWithLrsTest {
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build(); .build();
serverResponseWriter.onNext(edsResponse); serverResponseWriter.onNext(edsResponse);
inOrder.verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); inOrder.verify(lrsClient).startLoadReporting(any(LoadReportCallback.class));
// Simulate receiving a new service config with balancer name changed. // Simulate receiving a new service config with balancer name changed.
newLbConfig = (Map<String, ?>) JsonParser.parse( newLbConfig = (Map<String, ?>) JsonParser.parse(
@ -349,10 +349,10 @@ public class XdsLoadBalancerWithLrsTest {
inOrder.verify(lrsClient).stopLoadReporting(); inOrder.verify(lrsClient).stopLoadReporting();
inOrder.verify(lrsClientFactory) inOrder.verify(lrsClientFactory)
.createLoadReportClient(same(oobChannel3), same(helper), same(backoffPolicyProvider), .createLoadReportClient(same(oobChannel3), same(helper), same(backoffPolicyProvider),
same(statsStore)); same(loadStatsStore));
serverResponseWriter.onNext(edsResponse); serverResponseWriter.onNext(edsResponse);
inOrder.verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); inOrder.verify(lrsClient).startLoadReporting(any(LoadReportCallback.class));
inOrder.verifyNoMoreInteractions(); inOrder.verifyNoMoreInteractions();
} }
@ -370,7 +370,7 @@ public class XdsLoadBalancerWithLrsTest {
verify(lrsClientFactory) verify(lrsClientFactory)
.createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider), .createLoadReportClient(same(oobChannel1), same(helper), same(backoffPolicyProvider),
same(statsStore)); same(loadStatsStore));
assertThat(streamRecorder.getValues()).hasSize(1); assertThat(streamRecorder.getValues()).hasSize(1);
// Simulate a syntactically correct EDS response. // Simulate a syntactically correct EDS response.
@ -380,7 +380,7 @@ public class XdsLoadBalancerWithLrsTest {
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment") .setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build(); .build();
serverResponseWriter.onNext(edsResponse); serverResponseWriter.onNext(edsResponse);
verify(lrsClient).startLoadReporting(any(XdsLoadReportCallback.class)); verify(lrsClient).startLoadReporting(any(LoadReportCallback.class));
// Simulate receiving a new service config with child policy changed. // Simulate receiving a new service config with child policy changed.
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")