xds: integrate LRS into XdsClient (#6490)

This change integrates invocation of client side load reporting into XdsClient's implementation:

- Changed LRS client implementation based on LRS design changes. In the new design, first LRS request contains a single ClusterStats message with cluster_name set to the cluster (AKA, CDS cluster) that this LRS client is reporting loads for (no stats data in first request). Then server responses back the name of cluster service (AKA, EDS service) to report loads for.

- Implemented newly proposed LRS client API for adding/removing sources of load stats data.

- Implemented XdsClient APIs for initiating/stopping load reporting.
This commit is contained in:
Chengyuan Zhang 2019-12-09 23:47:31 -08:00 committed by GitHub
parent 942c1c6b5f
commit f70f73f16c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 388 additions and 284 deletions

View File

@ -23,29 +23,26 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.LoadBalancer.Helper;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.BackoffPolicy.Provider;
import io.grpc.internal.GrpcUtil;
import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@ -55,20 +52,20 @@ import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
final class LoadReportClientImpl implements LoadReportClient {
@VisibleForTesting
static final String TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD
= "com.googleapis.trafficdirector.grpc_hostname";
// TODO(chengyuanzhang): use channel logger once XdsClientImpl migrates to use channel logger.
private static final Logger logger = Logger.getLogger(XdsClientImpl.class.getName());
// The name of load-balanced service.
private final String serviceName;
private final String clusterName;
private final ManagedChannel channel;
private final Node node;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timerService;
private final Supplier<Stopwatch> stopwatchSupplier;
private final Stopwatch retryStopwatch;
private final ChannelLogger logger;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final LoadStatsStore loadStatsStore;
// Sources of load stats data for each service in cluster.
private final Map<String, LoadStatsStore> loadStatsStoreMap = new HashMap<>();
private boolean started;
@Nullable
@ -80,28 +77,21 @@ final class LoadReportClientImpl implements LoadReportClient {
@Nullable
private LoadReportCallback callback;
private LoadReportClientImpl(ManagedChannel channel,
Helper helper,
BackoffPolicy.Provider backoffPolicyProvider,
LoadStatsStore loadStatsStore) {
this(channel, helper, GrpcUtil.STOPWATCH_SUPPLIER, backoffPolicyProvider, loadStatsStore);
}
@VisibleForTesting
LoadReportClientImpl(ManagedChannel channel,
Helper helper,
Supplier<Stopwatch> stopwatchSupplier,
String clusterName,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService scheduledExecutorService,
BackoffPolicy.Provider backoffPolicyProvider,
LoadStatsStore loadStatsStore) {
Supplier<Stopwatch> stopwatchSupplier) {
this.channel = checkNotNull(channel, "channel");
this.serviceName = checkNotNull(helper.getAuthority(), "serviceName");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.clusterName = checkNotNull(clusterName, "clusterName");
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
this.retryStopwatch = stopwatchSupplier.get();
this.logger = checkNotNull(helper.getChannelLogger(), "logger");
this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.loadStatsStore = checkNotNull(loadStatsStore, "loadStatsStore");
started = false;
}
@ -124,7 +114,7 @@ final class LoadReportClientImpl implements LoadReportClient {
lrsRpcRetryTimer.cancel();
}
if (lrsStream != null) {
lrsStream.close(null);
lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
}
started = false;
// Do not shutdown channel as it is not owned by LrsClient.
@ -132,12 +122,12 @@ final class LoadReportClientImpl implements LoadReportClient {
@Override
public void addLoadStatsStore(String clusterServiceName, LoadStatsStore loadStatsStore) {
// TODO(chengyuanzhang): to be implemented.
loadStatsStoreMap.put(clusterServiceName, loadStatsStore);
}
@Override
public void removeLoadStatsStore(String clusterServiceName) {
// TODO(chengyuanzhang): to be implemented.
loadStatsStoreMap.remove(clusterServiceName);
}
@VisibleForTesting
@ -182,9 +172,12 @@ final class LoadReportClientImpl implements LoadReportClient {
long loadReportIntervalNano = -1;
ScheduledHandle loadReportTimer;
// The name for the google service the client talks to. Received on LRS responses.
// Name of cluster service to report loads for, instructed by LRS responses.
// Currently we expect a gRPC client only talks to a single service per cluster. But we
// could support switching cluster services, for which loads for a cluster may
// spread to multiple services.
@Nullable
String clusterName;
String clusterServiceName;
LrsStream(LoadReportingServiceGrpc.LoadReportingServiceStub stub, Stopwatch stopwatch) {
this.stub = checkNotNull(stub, "stub");
@ -196,14 +189,11 @@ final class LoadReportClientImpl implements LoadReportClient {
reportStopwatch.reset().start();
LoadStatsRequest initRequest =
LoadStatsRequest.newBuilder()
.setNode(Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(serviceName).build())))
.setNode(node)
.addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName))
.build();
lrsRequestWriter.onNext(initRequest);
logger.log(ChannelLogLevel.DEBUG, "Initial LRS request sent: {0}", initRequest);
logger.log(Level.FINE, "Initial LRS request sent: {0}", initRequest);
}
@Override
@ -241,20 +231,18 @@ final class LoadReportClientImpl implements LoadReportClient {
private void sendLoadReport() {
long interval = reportStopwatch.elapsed(TimeUnit.NANOSECONDS);
reportStopwatch.reset().start();
ClusterStats report =
loadStatsStore.generateLoadReport()
.toBuilder()
.setClusterName(clusterName)
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
lrsRequestWriter.onNext(LoadStatsRequest.newBuilder()
.setNode(Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(serviceName).build())))
.addClusterStats(report)
.build());
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node);
if (loadStatsStoreMap.containsKey(clusterServiceName)) {
LoadStatsStore loadStatsStore = loadStatsStoreMap.get(clusterServiceName);
ClusterStats report =
loadStatsStore.generateLoadReport()
.toBuilder()
.setClusterName(clusterServiceName)
.setLoadReportInterval(Durations.fromNanos(interval))
.build();
requestBuilder.addClusterStats(report);
}
lrsRequestWriter.onNext(requestBuilder.build());
scheduleNextLoadReport();
}
@ -277,22 +265,24 @@ final class LoadReportClientImpl implements LoadReportClient {
}
if (!initialResponseReceived) {
logger.log(ChannelLogLevel.DEBUG, "Received LRS initial response: {0}", response);
logger.log(Level.FINE, "Received LRS initial response: {0}", response);
initialResponseReceived = true;
} else {
logger.log(ChannelLogLevel.DEBUG, "Received an LRS response: {0}", response);
logger.log(Level.FINE, "Received an LRS response: {0}", response);
}
loadReportIntervalNano = Durations.toNanos(response.getLoadReportingInterval());
callback.onReportResponse(loadReportIntervalNano);
List<String> serviceList = Collections.unmodifiableList(response.getClustersList());
// For gRPC use case, LRS response will only contain one cluster, which is the same as in
// the EDS response.
// For current gRPC use case, we expect traffic director only request client to report
// loads for a single service per cluster (which is the cluster service gRPC client talks
// to). We could support reporting loads for multiple services per cluster that gRPC
// client sends loads to due to service switching.
if (serviceList.size() != 1) {
logger.log(ChannelLogLevel.ERROR, "Received clusters: {0}, expect exactly one",
logger.log(Level.FINE, "Received clusters: {0}, expect exactly one",
serviceList);
return;
}
clusterName = serviceList.get(0);
clusterServiceName = serviceList.get(0);
scheduleNextLoadReport();
}
@ -318,7 +308,7 @@ final class LoadReportClientImpl implements LoadReportClient {
delayNanos =
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
}
logger.log(ChannelLogLevel.DEBUG, "LRS stream closed, backoff in {0} second(s)",
logger.log(Level.FINE, "LRS stream closed, backoff in {0} second(s)",
TimeUnit.NANOSECONDS.toSeconds(delayNanos <= 0 ? 0 : delayNanos));
if (delayNanos <= 0) {
startLrsRpc();
@ -356,6 +346,8 @@ final class LoadReportClientImpl implements LoadReportClient {
/**
* Factory class for creating {@link LoadReportClient} instances.
*/
// TODO(chengyuanzhang): eliminate this factory after migrating EDS load balancer to
// use XdsClient.
abstract static class LoadReportClientFactory {
private static final LoadReportClientFactory DEFAULT_INSTANCE =
@ -363,11 +355,14 @@ final class LoadReportClientImpl implements LoadReportClient {
@Override
LoadReportClient createLoadReportClient(
ManagedChannel channel,
Helper helper,
Provider backoffPolicyProvider,
LoadStatsStore loadStatsStore) {
return new LoadReportClientImpl(channel, helper, backoffPolicyProvider,
loadStatsStore);
String clusterName,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
return new LoadReportClientImpl(channel, clusterName, node, syncContext, timeService,
backoffPolicyProvider, stopwatchSupplier);
}
};
@ -375,7 +370,8 @@ final class LoadReportClientImpl implements LoadReportClient {
return DEFAULT_INSTANCE;
}
abstract LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper,
BackoffPolicy.Provider backoffPolicyProvider, LoadStatsStore loadStatsStore);
abstract LoadReportClient createLoadReportClient(ManagedChannel channel, String clusterName,
Node node, SynchronizationContext syncContext, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier);
}
}

View File

@ -421,8 +421,16 @@ final class LookasideLb extends LoadBalancer {
// TODO(zdapeng): Use XdsClient to do Lrs directly.
// For now create an LRS Client.
if (xdsConfig.balancerName != null) {
lrsClient = loadReportClientFactory.createLoadReportClient(
channel, helper, new ExponentialBackoffPolicy.Provider(), loadStatsStore);
lrsClient =
loadReportClientFactory.createLoadReportClient(
channel,
helper.getAuthority(),
Node.getDefaultInstance(),
helper.getSynchronizationContext(),
helper.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
lrsClient.addLoadStatsStore(edsServiceName, loadStatsStore);
} else {
lrsClient = new LoadReportClient() {
@Override

View File

@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
@ -50,6 +51,7 @@ import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -82,7 +84,8 @@ final class XdsClientImpl extends XdsClient {
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
private final Supplier<Stopwatch> stopwatchSupplier;
private final Stopwatch adsStreamRetryStopwatch;
// The node identifier to be included in xDS requests. Management server only requires the
// first request to carry the node identifier on a stream. It should be identical if present
// more than once.
@ -114,6 +117,9 @@ final class XdsClientImpl extends XdsClient {
// watchers can watch endpoints in the same cluster.
private final Map<String, Set<EndpointWatcher>> endpointWatchers = new HashMap<>();
// Load reporting clients, with each responsible for reporting loads of a single cluster.
private final Map<String, LoadReportClientImpl> lrsClients = new HashMap<>();
@Nullable
private AdsStream adsStream;
@Nullable
@ -139,7 +145,7 @@ final class XdsClientImpl extends XdsClient {
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Stopwatch stopwatch) {
Supplier<Stopwatch> stopwatchSupplier) {
this.channel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
@ -147,7 +153,8 @@ final class XdsClientImpl extends XdsClient {
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatch = checkNotNull(stopwatch, "stopwatch");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatch");
adsStreamRetryStopwatch = stopwatchSupplier.get();
}
@Override
@ -156,6 +163,9 @@ final class XdsClientImpl extends XdsClient {
if (adsStream != null) {
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
}
for (LoadReportClientImpl lrsClient : lrsClients.values()) {
lrsClient.stopLoadReporting();
}
if (rpcRetryTimer != null) {
rpcRetryTimer.cancel();
}
@ -297,6 +307,40 @@ final class XdsClientImpl extends XdsClient {
}
}
@Override
LoadReportClient reportClientStats(String clusterName, String serverUri) {
checkNotNull(serverUri, "serverUri");
checkArgument(serverUri.equals(""),
"Currently only support empty serverUri, which defaults to the same "
+ "management server this client talks to.");
if (!lrsClients.containsKey(clusterName)) {
LoadReportClientImpl lrsClient =
new LoadReportClientImpl(
channel,
clusterName,
node,
syncContext,
timeService,
backoffPolicyProvider,
stopwatchSupplier);
lrsClient.startLoadReporting(
new LoadReportCallback() {
@Override
public void onReportResponse(long reportIntervalNano) {}
});
lrsClients.put(clusterName, lrsClient);
}
return lrsClients.get(clusterName);
}
@Override
void cancelClientStatsReport(String clusterName) {
LoadReportClientImpl lrsClient = lrsClients.remove(clusterName);
if (lrsClient != null) {
lrsClient.stopLoadReporting();
}
}
/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.
@ -307,7 +351,7 @@ final class XdsClientImpl extends XdsClient {
AggregatedDiscoveryServiceGrpc.newStub(channel);
adsStream = new AdsStream(stub);
adsStream.start();
stopwatch.reset().start();
adsStreamRetryStopwatch.reset().start();
}
/**
@ -894,7 +938,8 @@ final class XdsClientImpl extends XdsClient {
delayNanos =
Math.max(
0,
retryBackoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS));
retryBackoffPolicy.nextBackoffNanos()
- adsStreamRetryStopwatch.elapsed(TimeUnit.NANOSECONDS));
}
logger.log(Level.FINE, "{0} stream closed, retry in {1} ns", new Object[]{this, delayNanos});
rpcRetryTimer =

View File

@ -19,20 +19,16 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.Iterables;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.api.v2.core.Locality;
import io.envoyproxy.envoy.api.v2.core.Node;
@ -42,8 +38,8 @@ import io.envoyproxy.envoy.api.v2.endpoint.UpstreamLocalityStats;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.grpc.ChannelLogger;
import io.grpc.LoadBalancer.Helper;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
@ -54,10 +50,10 @@ import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.LoadReportClient.LoadReportCallback;
import java.text.MessageFormat;
import java.util.ArrayDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -69,8 +65,6 @@ import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Unit tests for {@link LoadReportClientImpl}.
@ -78,8 +72,8 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class LoadReportClientImplTest {
private static final String SERVICE_AUTHORITY = "api.google.com";
private static final String CLUSTER_NAME = "gslb-namespace:gslb-service-name";
private static final String CLUSTER_NAME = "foo.blade.googleapis.com";
private static final Node NODE = Node.newBuilder().setId("LRS test").build();
private static final FakeClock.TaskFilter LOAD_REPORTING_TASK_FILTER =
new FakeClock.TaskFilter() {
@Override
@ -102,13 +96,11 @@ public class LoadReportClientImplTest {
.setZone("test_zone")
.setSubZone("test_subzone")
.build();
private static final LoadStatsRequest EXPECTED_INITIAL_REQ = LoadStatsRequest.newBuilder()
.setNode(Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build())))
.build();
private static final LoadStatsRequest EXPECTED_INITIAL_REQ =
LoadStatsRequest.newBuilder()
.setNode(NODE)
.addClusterStats(ClusterStats.newBuilder().setClusterName(CLUSTER_NAME))
.build();
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@ -119,24 +111,11 @@ public class LoadReportClientImplTest {
throw new AssertionError(e);
}
});
private final ArrayDeque<String> logs = new ArrayDeque<>();
private final ChannelLogger channelLogger = new ChannelLogger() {
@Override
public void log(ChannelLogLevel level, String msg) {
logs.add(level + ": " + msg);
}
@Override
public void log(ChannelLogLevel level, String template, Object... args) {
log(level, MessageFormat.format(template, args));
}
};
private final FakeClock fakeClock = new FakeClock();
private final ArrayDeque<StreamObserver<LoadStatsRequest>> lrsRequestObservers =
new ArrayDeque<>();
private final AtomicBoolean callEnded = new AtomicBoolean(true);
@Mock
private Helper helper;
@Mock
private BackoffPolicy.Provider backoffPolicyProvider;
@Mock
@ -144,7 +123,7 @@ public class LoadReportClientImplTest {
@Mock
private BackoffPolicy backoffPolicy2;
@Mock
private LoadStatsStore loadStatsStore;
private LoadStatsStore mockLoadStatsStore;
@Mock
private LoadReportCallback callback;
@Captor
@ -164,16 +143,17 @@ public class LoadReportClientImplTest {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
final StreamObserver<LoadStatsResponse> responseObserver) {
assertThat(callEnded.get()).isTrue(); // ensure previous call was ended
callEnded.set(false);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
callEnded.set(true);
}
}, MoreExecutors.directExecutor());
StreamObserver<LoadStatsRequest> requestObserver =
mock(StreamObserver.class);
Answer<Void> closeRpc = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
responseObserver.onCompleted();
return null;
}
};
doAnswer(closeRpc).when(requestObserver).onCompleted();
lrsRequestObservers.add(requestObserver);
return requestObserver;
}
@ -183,25 +163,26 @@ public class LoadReportClientImplTest {
.addService(mockLoadReportingService).build().start());
channel = cleanupRule.register(
InProcessChannelBuilder.forName("fakeLoadReportingServer").directExecutor().build());
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(fakeClock.getScheduledExecutorService());
when(helper.getChannelLogger()).thenReturn(channelLogger);
when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY);
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2);
when(backoffPolicy1.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(10L));
lrsClient =
new LoadReportClientImpl(channel, helper, fakeClock.getStopwatchSupplier(),
new LoadReportClientImpl(
channel,
CLUSTER_NAME,
NODE, syncContext,
fakeClock.getScheduledExecutorService(),
backoffPolicyProvider,
loadStatsStore);
fakeClock.getStopwatchSupplier());
lrsClient.startLoadReporting(callback);
}
@After
public void tearDown() {
lrsClient.stopLoadReporting();
assertThat(callEnded.get()).isTrue();
}
@Test
@ -229,32 +210,39 @@ public class LoadReportClientImplTest {
verifyNoMoreInteractions(requestObserver);
lrsClient.stopLoadReporting();
verify(requestObserver).onCompleted();
assertThat(callEnded.get()).isTrue();
assertThat(fakeClock.getPendingTasks(LRS_RPC_RETRY_TASK_FILTER)).isEmpty();
lrsClient.stopLoadReporting();
verifyNoMoreInteractions(requestObserver);
assertThat(callEnded.get()).isTrue();
lrsClient.startLoadReporting(callback);
verify(mockLoadReportingService, times(2)).streamLoadStats(lrsResponseObserverCaptor.capture());
assertThat(lrsRequestObservers).hasSize(2);
}
// Currently we expect each gRPC client talks to a single service per cluster, so we test LRS
// client reporting load for a single cluster service only.
// TODO(chengyuanzhang): Existing test suites for LRS client implementation have poor behavior
// coverage and are not robust. Should improve once its usage is finalized without too much
// assumption.
@Test
public void loadReportActualIntervalAsSpecified() {
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll();
responseObserver.onNext(buildLrsResponse(1453));
assertThat(logs).containsExactly(
"DEBUG: Received LRS initial response: " + buildLrsResponse(1453));
assertNextReport(inOrder, requestObserver, buildEmptyClusterStats(1453));
// Add load stats source for some cluster service.
when(mockLoadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
lrsClient.addLoadStatsStore("namespace-foo:service-blade", mockLoadStatsStore);
InOrder inOrder = inOrder(requestObserver, mockLoadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 1453));
assertNextReport(inOrder, requestObserver, mockLoadStatsStore,
buildEmptyClusterStats("namespace-foo:service-blade", 1453));
verify(callback).onReportResponse(1453);
}
@ -265,36 +253,83 @@ public class LoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
when(loadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
// Add load stats source for some cluster service.
when(mockLoadStatsStore.generateLoadReport()).thenReturn(ClusterStats.newBuilder().build());
lrsClient.addLoadStatsStore("namespace-foo:service-blade", mockLoadStatsStore);
InOrder inOrder = inOrder(requestObserver, loadStatsStore);
InOrder inOrder = inOrder(requestObserver, mockLoadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll();
responseObserver.onNext(buildLrsResponse(1362));
assertThat(logs).containsExactly(
"DEBUG: Received LRS initial response: " + buildLrsResponse(1362));
logs.poll();
assertNextReport(inOrder, requestObserver, buildEmptyClusterStats(1362));
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 1362));
assertNextReport(inOrder, requestObserver, mockLoadStatsStore,
buildEmptyClusterStats("namespace-foo:service-blade", 1362));
verify(callback).onReportResponse(1362);
responseObserver.onNext(buildLrsResponse(2183345));
assertThat(logs).containsExactly(
"DEBUG: Received an LRS response: " + buildLrsResponse(2183345));
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 2183345));
// Updated load reporting interval becomes effective immediately.
assertNextReport(inOrder, requestObserver, buildEmptyClusterStats(2183345));
assertNextReport(inOrder, requestObserver, mockLoadStatsStore,
buildEmptyClusterStats("namespace-foo:service-blade", 2183345));
verify(callback).onReportResponse(2183345);
}
@Test
public void reportNothingIfLoadStatsSourceNotAvailable() {
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
// Server asks to report load for some cluster service.
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 1395));
// Nothing to be reported as no load stats data is available.
fakeClock.forwardNanos(1395);
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
verify(requestObserver, times(2)).onNext(reportCaptor.capture());
assertThat(reportCaptor.getValue().getClusterStatsCount()).isEqualTo(0);
// Add load stats source.
ClusterStats clusterStats = ClusterStats.newBuilder()
.setClusterName("namespace-foo:service-blade")
.setLoadReportInterval(Durations.fromNanos(50))
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY)
.setTotalRequestsInProgress(542)
.setTotalSuccessfulRequests(645)
.setTotalErrorRequests(85)
.setTotalIssuedRequests(27))
.addDroppedRequests(DroppedRequests.newBuilder()
.setCategory("lb")
.setDroppedCount(0))
.addDroppedRequests(DroppedRequests.newBuilder()
.setCategory("throttle")
.setDroppedCount(14))
.setTotalDroppedRequests(14)
.build();
when(mockLoadStatsStore.generateLoadReport()).thenReturn(clusterStats);
lrsClient.addLoadStatsStore("namespace-foo:service-blade", mockLoadStatsStore);
// Loads reported.
fakeClock.forwardNanos(1395);
verify(requestObserver, times(3)).onNext(reportCaptor.capture());
assertThat(reportCaptor.getValue().getClusterStatsCount()).isEqualTo(1);
// Delete load stats source.
lrsClient.removeLoadStatsStore("namespace-foo:service-blade");
// Nothing to report as load stats data is not available.
fakeClock.forwardNanos(1395);
verify(requestObserver, times(4)).onNext(reportCaptor.capture());
assertThat(reportCaptor.getValue().getClusterStatsCount()).isEqualTo(0);
}
@Test
public void reportRecordedLoadData() {
verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
long callsInProgress = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
long callsSucceeded = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
@ -304,7 +339,7 @@ public class LoadReportClientImplTest {
long numThrottleDrops = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
ClusterStats expectedStats1 = ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setClusterName("namespace-foo:service-blade")
.setLoadReportInterval(Durations.fromNanos(1362))
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY)
@ -321,7 +356,7 @@ public class LoadReportClientImplTest {
.setTotalDroppedRequests(numLbDrops + numThrottleDrops)
.build();
ClusterStats expectedStats2 = ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setClusterName("namespace-foo:service-blade")
.setLoadReportInterval(Durations.fromNanos(1362))
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY)
@ -334,12 +369,18 @@ public class LoadReportClientImplTest {
.setDroppedCount(0))
.setTotalDroppedRequests(0)
.build();
when(loadStatsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2);
responseObserver.onNext(buildLrsResponse(1362));
assertNextReport(inOrder, requestObserver, expectedStats1);
// Add load stats source for some cluster service.
when(mockLoadStatsStore.generateLoadReport()).thenReturn(expectedStats1, expectedStats2);
lrsClient.addLoadStatsStore("namespace-foo:service-blade", mockLoadStatsStore);
assertNextReport(inOrder, requestObserver, expectedStats2);
InOrder inOrder = inOrder(requestObserver, mockLoadStatsStore);
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 1362));
assertNextReport(inOrder, requestObserver, mockLoadStatsStore, expectedStats1);
assertNextReport(inOrder, requestObserver, mockLoadStatsStore, expectedStats2);
}
@Test
@ -353,8 +394,6 @@ public class LoadReportClientImplTest {
// First balancer RPC
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll();
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer closes it immediately (erroneously)
@ -363,8 +402,6 @@ public class LoadReportClientImplTest {
// Will start backoff sequence 1 (1s)
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(logs).containsExactly("DEBUG: LRS stream closed, backoff in 1 second(s)");
logs.poll();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Fast-forward to a moment before the retry
@ -377,8 +414,6 @@ public class LoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll();
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer closes it with an error.
@ -386,8 +421,6 @@ public class LoadReportClientImplTest {
// Will continue the backoff sequence 1 (10s)
verifyNoMoreInteractions(backoffPolicyProvider);
inOrder.verify(backoffPolicy1).nextBackoffNanos();
assertThat(logs).containsExactly("DEBUG: LRS stream closed, backoff in 10 second(s)");
logs.poll();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Fast-forward to a moment before the retry
@ -400,15 +433,10 @@ public class LoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.poll();
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer sends initial response.
responseObserver.onNext(buildLrsResponse(0));
assertThat(logs).containsExactly(
"DEBUG: Received LRS initial response: " + buildLrsResponse(0));
logs.poll();
// Balancer sends a response asking for loads of some cluster service.
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 0));
// Then breaks the RPC
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -416,9 +444,6 @@ public class LoadReportClientImplTest {
// Will reset the retry sequence and retry immediately, because balancer has responded.
inOrder.verify(backoffPolicyProvider).get();
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
assertThat(logs).containsExactly("DEBUG: LRS stream closed, backoff in 0 second(s)",
"DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
logs.clear();
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
@ -430,9 +455,6 @@ public class LoadReportClientImplTest {
// Will be on the first retry (1s) of backoff sequence 2.
inOrder.verify(backoffPolicy2).nextBackoffNanos();
// The logged backoff time will be 0 seconds as it is in granularity of seconds.
assertThat(logs).containsExactly("DEBUG: LRS stream closed, backoff in 0 second(s)");
logs.poll();
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
@ -444,7 +466,6 @@ public class LoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
assertThat(logs).containsExactly("DEBUG: Initial LRS request sent: " + EXPECTED_INITIAL_REQ);
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Wrapping up
@ -460,37 +481,9 @@ public class LoadReportClientImplTest {
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
// First LRS request sent.
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer sends back a normal response.
responseObserver.onNext(buildLrsResponse(100));
// A load reporting task is scheduled.
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
fakeClock.forwardNanos(99);
verifyNoMoreInteractions(requestObserver);
// Balancer closes the stream with error.
responseObserver.onError(Status.UNKNOWN.asException());
// The unsent load report is cancelled.
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
// Will retry immediately as balancer has responded previously.
verify(mockLoadReportingService, times(2)).streamLoadStats(lrsResponseObserverCaptor.capture());
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
InOrder inOrder = inOrder(requestObserver, loadStatsStore);
inOrder.verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
// Balancer sends another response with a different report interval.
responseObserver.onNext(buildLrsResponse(50));
// Load reporting runs normally.
// Add load stats source for some cluster service.
ClusterStats stats1 = ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setClusterName("namespace-foo:service-blade")
.setLoadReportInterval(Durations.fromNanos(50))
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY)
@ -507,7 +500,7 @@ public class LoadReportClientImplTest {
.setTotalDroppedRequests(14)
.build();
ClusterStats stats2 = ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setClusterName("namespace-foo:service-blade")
.setLoadReportInterval(Durations.fromNanos(50))
.addUpstreamLocalityStats(UpstreamLocalityStats.newBuilder()
.setLocality(TEST_LOCALITY)
@ -520,9 +513,40 @@ public class LoadReportClientImplTest {
.setDroppedCount(0))
.setTotalDroppedRequests(0)
.build();
when(loadStatsStore.generateLoadReport()).thenReturn(stats1, stats2);
assertNextReport(inOrder, requestObserver, stats1);
assertNextReport(inOrder, requestObserver, stats2);
when(mockLoadStatsStore.generateLoadReport()).thenReturn(stats1, stats2);
lrsClient.addLoadStatsStore("namespace-foo:service-blade", mockLoadStatsStore);
// First LRS request sent.
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Balancer sends a response asking for loads of some cluster service.
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 100));
// A load reporting task is scheduled.
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
fakeClock.forwardNanos(99);
verifyNoMoreInteractions(requestObserver);
// Balancer closes the stream with error.
responseObserver.onError(Status.UNKNOWN.asException());
// The unsent load report is cancelled.
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
// Will retry immediately as balancer has responded previously.
verify(mockLoadReportingService, times(2)).streamLoadStats(lrsResponseObserverCaptor.capture());
responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
requestObserver = lrsRequestObservers.poll();
InOrder inOrder = inOrder(requestObserver, mockLoadStatsStore);
inOrder.verify(requestObserver).onNext(eq(EXPECTED_INITIAL_REQ));
// Balancer sends another response with a different report interval.
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 50));
// Load reporting runs normally.
assertNextReport(inOrder, requestObserver, mockLoadStatsStore, stats1);
assertNextReport(inOrder, requestObserver, mockLoadStatsStore, stats2);
}
@Test
@ -531,23 +555,22 @@ public class LoadReportClientImplTest {
StreamObserver<LoadStatsResponse> responseObserver = lrsResponseObserverCaptor.getValue();
assertThat(lrsRequestObservers).hasSize(1);
StreamObserver<LoadStatsRequest> requestObserver = lrsRequestObservers.poll();
InOrder inOrder = inOrder(requestObserver);
// First balancer RPC
inOrder.verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
verify(requestObserver).onNext(EXPECTED_INITIAL_REQ);
assertEquals(0, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
// Simulate receiving LB response
// Simulate receiving a response from traffic director.
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
responseObserver.onNext(buildLrsResponse(1983));
responseObserver.onNext(buildLrsResponse("namespace-foo:service-blade", 1983));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(1983, scheduledTask.getDelay(TimeUnit.NANOSECONDS));
// Close lbStream
requestObserver.onCompleted();
// Close RPC stream.
responseObserver.onCompleted();
// Reporting task cancelled
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
@ -556,24 +579,26 @@ public class LoadReportClientImplTest {
scheduledTask.command.run();
// No report sent. No new task scheduled
inOrder.verify(requestObserver, never()).onNext(any(LoadStatsRequest.class));
verifyNoMoreInteractions(requestObserver);
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
}
private static ClusterStats buildEmptyClusterStats(long loadReportIntervalNanos) {
private static ClusterStats buildEmptyClusterStats(String clusterServiceName,
long loadReportIntervalNanos) {
return ClusterStats.newBuilder()
.setClusterName(CLUSTER_NAME)
.setClusterName(clusterServiceName)
.setLoadReportInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
private static LoadStatsResponse buildLrsResponse(long loadReportIntervalNanos) {
private static LoadStatsResponse buildLrsResponse(String clusterServiceName,
long loadReportIntervalNanos) {
return LoadStatsResponse.newBuilder()
.addClusters(CLUSTER_NAME)
.addClusters(clusterServiceName)
.setLoadReportingInterval(Durations.fromNanos(loadReportIntervalNanos)).build();
}
private void assertNextReport(InOrder inOrder, StreamObserver<LoadStatsRequest> requestObserver,
ClusterStats expectedStats) {
LoadStatsStore loadStatsStore, ClusterStats expectedStats) {
long loadReportIntervalNanos = Durations.toNanos(expectedStats.getLoadReportInterval());
assertEquals(0, fakeClock.forwardTime(loadReportIntervalNanos - 1, TimeUnit.NANOSECONDS));
inOrder.verifyNoMoreInteractions();
@ -584,12 +609,7 @@ public class LoadReportClientImplTest {
ArgumentCaptor<LoadStatsRequest> reportCaptor = ArgumentCaptor.forClass(null);
inOrder.verify(requestObserver).onNext(reportCaptor.capture());
LoadStatsRequest report = reportCaptor.getValue();
assertEquals(report.getNode(), Node.newBuilder()
.setMetadata(Struct.newBuilder()
.putFields(
LoadReportClientImpl.TRAFFICDIRECTOR_GRPC_HOSTNAME_FIELD,
Value.newBuilder().setStringValue(SERVICE_AUTHORITY).build()))
.build());
assertEquals(report.getNode(), NODE);
assertEquals(1, report.getClusterStatsCount());
assertThat(report.getClusterStats(0)).isEqualTo(expectedStats);
}

View File

@ -31,8 +31,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@ -65,7 +66,7 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy.Provider;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.internal.JsonParser;
import io.grpc.internal.ObjectPool;
@ -90,6 +91,7 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -218,8 +220,9 @@ public class LookasideLbTest {
LoadReportClientFactory loadReportClientFactory = new LoadReportClientFactory() {
@Override
LoadReportClient createLoadReportClient(ManagedChannel channel, Helper helper,
Provider backoffPolicyProvider, LoadStatsStore loadStatsStore) {
LoadReportClient createLoadReportClient(ManagedChannel channel, String clusterName,
Node node, SynchronizationContext syncContext, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
LoadReportClient loadReportClient = mock(LoadReportClient.class);
loadReportClients.add(loadReportClient);
return loadReportClient;
@ -819,50 +822,4 @@ public class LookasideLbTest {
verify(loadReportClient).stopLoadReporting();
assertThat(channel.isShutdown()).isTrue();
}
/**
* Tests load reporting is initiated after receiving the first valid EDS response from the traffic
* director, then its operation is independent of load balancing until xDS load balancer is
* shutdown.
*/
@Test
public void reportLoadAfterReceivingFirstEdsResponseUntilShutdown() {
lookasideLb.handleResolvedAddresses(defaultResolvedAddress);
// Simulates a syntactically incorrect EDS response.
serverResponseWriter.onNext(DiscoveryResponse.getDefaultInstance());
LoadReportClient loadReportClient = Iterables.getOnlyElement(loadReportClients);
verify(loadReportClient, never()).startLoadReporting(any(LoadReportCallback.class));
verify(edsUpdateCallback, never()).onWorking();
verify(edsUpdateCallback, never()).onError();
// Simulate a syntactically correct EDS response.
DiscoveryResponse edsResponse =
DiscoveryResponse.newBuilder()
.addResources(Any.pack(ClusterLoadAssignment.getDefaultInstance()))
.setTypeUrl("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
.build();
serverResponseWriter.onNext(edsResponse);
verify(edsUpdateCallback).onWorking();
ArgumentCaptor<LoadReportCallback> lrsCallbackCaptor = ArgumentCaptor.forClass(null);
verify(loadReportClient).startLoadReporting(lrsCallbackCaptor.capture());
lrsCallbackCaptor.getValue().onReportResponse(19543);
LocalityStore localityStore = Iterables.getOnlyElement(localityStores);
verify(localityStore).updateOobMetricsReportInterval(19543);
// Simulate another EDS response from the same remote balancer.
serverResponseWriter.onNext(edsResponse);
verifyNoMoreInteractions(edsUpdateCallback, loadReportClient);
// Simulate an EDS error response.
serverResponseWriter.onError(Status.ABORTED.asException());
verify(edsUpdateCallback).onError();
verifyNoMoreInteractions(edsUpdateCallback, loadReportClient);
verify(localityStore, times(1)).updateOobMetricsReportInterval(anyLong()); // only once
lookasideLb.shutdown();
}
}

View File

@ -55,12 +55,16 @@ import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.core.Node;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.route.RedirectAction;
import io.envoyproxy.envoy.api.v2.route.Route;
import io.envoyproxy.envoy.api.v2.route.VirtualHost;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;
@ -93,6 +97,7 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -140,6 +145,9 @@ public class XdsClientImplTest {
private final Queue<StreamObserver<DiscoveryRequest>> requestObservers = new ArrayDeque<>();
private final AtomicBoolean callEnded = new AtomicBoolean(true);
private final Queue<LoadReportCall> loadReportCalls = new ArrayDeque<>();
private final AtomicInteger runningLrsCalls = new AtomicInteger();
@Mock
private AggregatedDiscoveryServiceImplBase mockedDiscoveryService;
@Mock
@ -166,7 +174,7 @@ public class XdsClientImplTest {
when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
final String serverName = InProcessServerBuilder.generateName();
AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() {
AggregatedDiscoveryServiceImplBase adsServiceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
final StreamObserver<DiscoveryResponse> responseObserver) {
@ -187,12 +195,34 @@ public class XdsClientImplTest {
}
};
mockedDiscoveryService =
mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(serviceImpl));
mock(AggregatedDiscoveryServiceImplBase.class, delegatesTo(adsServiceImpl));
LoadReportingServiceImplBase lrsServiceImpl = new LoadReportingServiceImplBase() {
@Override
public StreamObserver<LoadStatsRequest> streamLoadStats(
StreamObserver<LoadStatsResponse> responseObserver) {
runningLrsCalls.getAndIncrement();
@SuppressWarnings("unchecked")
StreamObserver<LoadStatsRequest> requestObserver = mock(StreamObserver.class);
final LoadReportCall call = new LoadReportCall(requestObserver, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context context) {
call.cancelled = true;
runningLrsCalls.getAndDecrement();
}
}, MoreExecutors.directExecutor());
loadReportCalls.offer(call);
return requestObserver;
}
};
cleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.addService(mockedDiscoveryService)
.addService(lrsServiceImpl)
.directExecutor()
.build()
.start());
@ -213,17 +243,22 @@ public class XdsClientImplTest {
xdsClient =
new XdsClientImpl(servers, channelFactory, NODE, syncContext,
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier().get());
fakeClock.getStopwatchSupplier());
// Only the connection to management server is established, no RPC request is sent until at
// least one watcher is registered.
assertThat(responseObservers).isEmpty();
assertThat(requestObservers).isEmpty();
// Load reporting is not initiated until being invoked to do so.
assertThat(loadReportCalls).isEmpty();
assertThat(runningLrsCalls.get()).isEqualTo(0);
}
@After
public void tearDown() {
xdsClient.shutdown();
assertThat(callEnded.get()).isTrue();
assertThat(runningLrsCalls.get()).isEqualTo(0);
assertThat(channel.isShutdown()).isTrue();
assertThat(fakeClock.getPendingTasks()).isEmpty();
}
@ -2559,6 +2594,28 @@ public class XdsClientImplTest {
backoffPolicy2);
}
/**
* Tests sending a streaming LRS RPC for each cluster to report loads for.
*/
@Test
public void reportLoadStatsToServer() {
xdsClient.reportClientStats("cluster-foo.googleapis.com", "");
LoadReportCall lrsCall1 = loadReportCalls.poll();
verify(lrsCall1.requestObserver)
.onNext(eq(buildInitialLoadStatsRequest("cluster-foo.googleapis.com")));
assertThat(lrsCall1.cancelled).isFalse();
xdsClient.reportClientStats("cluster-bar.googleapis.com", "");
LoadReportCall lrsCall2 = loadReportCalls.poll();
verify(lrsCall2.requestObserver)
.onNext(eq(buildInitialLoadStatsRequest("cluster-bar.googleapis.com")));
assertThat(lrsCall2.cancelled).isFalse();
xdsClient.cancelClientStatsReport("cluster-bar.googleapis.com");
assertThat(lrsCall2.cancelled).isTrue();
assertThat(runningLrsCalls.get()).isEqualTo(1);
}
// Simulates the use case of watching clusters/endpoints based on service config resolved by
// LDS/RDS.
private void waitUntilConfigResolved(StreamObserver<DiscoveryResponse> responseObserver) {
@ -2640,6 +2697,14 @@ public class XdsClientImplTest {
private static LoadStatsRequest buildInitialLoadStatsRequest(String clusterName) {
return
LoadStatsRequest.newBuilder()
.setNode(NODE)
.addClusterStats(ClusterStats.newBuilder().setClusterName(clusterName))
.build();
}
/**
* Matcher for DiscoveryRequest without the comparison of error_details field, which is used for
* management server debugging purposes.
@ -2684,4 +2749,17 @@ public class XdsClientImplTest {
return NODE.equals(argument.getNode());
}
}
private static class LoadReportCall {
private final StreamObserver<LoadStatsRequest> requestObserver;
@SuppressWarnings("unused")
private final StreamObserver<LoadStatsResponse> responseObserver;
private boolean cancelled;
LoadReportCall(StreamObserver<LoadStatsRequest> requestObserver,
StreamObserver<LoadStatsResponse> responseObserver) {
this.requestObserver = requestObserver;
this.responseObserver = responseObserver;
}
}
}