interop-testing: implement test client support for xDS timeout test (#7837)

Changes the xDS interop test client to support timeout test.

- Synced xDS test proto messages with grpc-proto.
- Changed RpcConfig to be the configuration for per test method type. Added timeoutSec for its deadline configuration.
- Changed accumulated stats to include RPC status instead of just succeeded/failed.
This commit is contained in:
Chengyuan Zhang 2021-01-27 17:09:01 -08:00 committed by GitHub
parent ac2ead70b4
commit 14a38ef9b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 68 deletions

View File

@ -38,6 +38,7 @@ import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.grpc.services.ChannelzService;
@ -47,12 +48,14 @@ import io.grpc.testing.integration.Messages.ClientConfigureRequest.RpcType;
import io.grpc.testing.integration.Messages.ClientConfigureResponse;
import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse;
import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse.MethodStats;
import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.xds.XdsChannelCredentials;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
@ -67,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/** Client for xDS interop tests. */
public final class XdsTestClient {
@ -75,15 +79,13 @@ public final class XdsTestClient {
private final Set<XdsStatsWatcher> watchers = new HashSet<>();
private final Object lock = new Object();
private final List<ManagedChannel> channels = new ArrayList<>();
private final Map<String, Integer> rpcsStartedByMethod = new HashMap<>();
private final Map<String, Integer> rpcsFailedByMethod = new HashMap<>();
private final Map<String, Integer> rpcsSucceededByMethod = new HashMap<>();
private final StatsAccumulator statsAccumulator = new StatsAccumulator();
private static final int CHANNELZ_MAX_PAGE_SIZE = 100;
private int numChannels = 1;
private boolean printResponse = false;
private int qps = 1;
private volatile RpcConfig rpcConfig;
private volatile List<RpcConfig> rpcConfigs;
private int rpcTimeoutSec = 20;
private boolean secureMode = false;
private String server = "localhost:8080";
@ -160,7 +162,15 @@ public final class XdsTestClient {
break;
}
}
rpcConfig = new RpcConfig(rpcTypes, metadata);
List<RpcConfig> configs = new ArrayList<>();
for (RpcType type : rpcTypes) {
Metadata md = new Metadata();
if (metadata.containsKey(type)) {
md = metadata.get(type);
}
configs.add(new RpcConfig(type, md, rpcTimeoutSec));
}
rpcConfigs = Collections.unmodifiableList(configs);
if (usage) {
XdsTestClient c = new XdsTestClient();
@ -280,17 +290,13 @@ public final class XdsTestClient {
@Override
public void run() {
RpcConfig config = rpcConfig;
for (RpcType type : config.rpcTypes) {
Metadata headers = config.metadata.get(type);
if (headers == null) {
headers = new Metadata();
}
makeRpc(type, headers);
List<RpcConfig> configs = rpcConfigs;
for (RpcConfig cfg : configs) {
makeRpc(cfg);
}
}
private void makeRpc(final RpcType rpcType, final Metadata headersToSend) {
private void makeRpc(final RpcConfig config) {
final long requestId;
final Set<XdsStatsWatcher> savedWatchers = new HashSet<>();
synchronized (lock) {
@ -304,7 +310,7 @@ public final class XdsTestClient {
final AtomicReference<ClientCall<?, ?>> clientCallRef = new AtomicReference<>();
final AtomicReference<String> hostnameRef = new AtomicReference<>();
stub =
stub.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS)
stub.withDeadlineAfter(config.timeoutSec, TimeUnit.SECONDS)
.withInterceptors(
new ClientInterceptor() {
@Override
@ -317,7 +323,7 @@ public final class XdsTestClient {
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.merge(headersToSend);
headers.merge(config.metadata);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
@ -332,31 +338,32 @@ public final class XdsTestClient {
}
});
if (rpcType == RpcType.EMPTY_CALL) {
if (config.rpcType == RpcType.EMPTY_CALL) {
stub.emptyCall(
EmptyProtos.Empty.getDefaultInstance(),
new StreamObserver<EmptyProtos.Empty>() {
@Override
public void onCompleted() {
handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers);
handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers);
}
@Override
public void onError(Throwable t) {
handleRpcError(requestId, rpcType, savedWatchers);
handleRpcError(requestId, config.rpcType, Status.fromThrowable(t),
savedWatchers);
}
@Override
public void onNext(EmptyProtos.Empty response) {}
});
} else if (rpcType == RpcType.UNARY_CALL) {
} else if (config.rpcType == RpcType.UNARY_CALL) {
SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build();
stub.unaryCall(
request,
new StreamObserver<SimpleResponse>() {
@Override
public void onCompleted() {
handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers);
handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers);
}
@Override
@ -364,7 +371,8 @@ public final class XdsTestClient {
if (printResponse) {
logger.log(Level.WARNING, "Rpc failed: {0}", t);
}
handleRpcError(requestId, rpcType, savedWatchers);
handleRpcError(requestId, config.rpcType, Status.fromThrowable(t),
savedWatchers);
}
@Override
@ -389,37 +397,20 @@ public final class XdsTestClient {
}
});
} else {
throw new AssertionError("Unknown RPC type: " + rpcType);
}
synchronized (lock) {
Integer startedBase = rpcsStartedByMethod.get(rpcType.name());
if (startedBase == null) {
startedBase = 0;
}
rpcsStartedByMethod.put(rpcType.name(), startedBase + 1);
throw new AssertionError("Unknown RPC type: " + config.rpcType);
}
statsAccumulator.recordRpcStarted(config.rpcType);
}
private void handleRpcCompleted(long requestId, RpcType rpcType, String hostname,
Set<XdsStatsWatcher> watchers) {
synchronized (lock) {
Integer succeededBase = rpcsSucceededByMethod.get(rpcType.name());
if (succeededBase == null) {
succeededBase = 0;
}
rpcsSucceededByMethod.put(rpcType.name(), succeededBase + 1);
}
statsAccumulator.recordRpcFinished(rpcType, Status.OK);
notifyWatchers(watchers, rpcType, requestId, hostname);
}
private void handleRpcError(long requestId, RpcType rpcType, Set<XdsStatsWatcher> watchers) {
synchronized (lock) {
Integer failedBase = rpcsFailedByMethod.get(rpcType.name());
if (failedBase == null) {
failedBase = 0;
}
rpcsFailedByMethod.put(rpcType.name(), failedBase + 1);
}
private void handleRpcError(long requestId, RpcType rpcType, Status status,
Set<XdsStatsWatcher> watchers) {
statsAccumulator.recordRpcFinished(rpcType, status);
notifyWatchers(watchers, rpcType, requestId, null);
}
}
@ -463,7 +454,13 @@ public final class XdsTestClient {
metadata.getValue());
newMetadata.put(metadata.getType(), md);
}
rpcConfig = new RpcConfig(request.getTypesList(), newMetadata);
List<RpcConfig> configs = new ArrayList<>();
for (RpcType type : request.getTypesList()) {
Metadata md = newMetadata.containsKey(type) ? newMetadata.get(type) : new Metadata();
int timeout = request.getTimeoutSec() != 0 ? request.getTimeoutSec() : rpcTimeoutSec;
configs.add(new RpcConfig(type, md, timeout));
}
rpcConfigs = Collections.unmodifiableList(configs);
responseObserver.onNext(ClientConfigureResponse.getDefaultInstance());
responseObserver.onCompleted();
}
@ -491,27 +488,81 @@ public final class XdsTestClient {
@Override
public void getClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest request,
StreamObserver<LoadBalancerAccumulatedStatsResponse> responseObserver) {
LoadBalancerAccumulatedStatsResponse.Builder responseBuilder =
LoadBalancerAccumulatedStatsResponse.newBuilder();
synchronized (lock) {
responseBuilder
.putAllNumRpcsStartedByMethod(rpcsStartedByMethod)
.putAllNumRpcsSucceededByMethod(rpcsSucceededByMethod)
.putAllNumRpcsFailedByMethod(rpcsFailedByMethod);
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onNext(statsAccumulator.getRpcStats());
responseObserver.onCompleted();
}
}
/** RPC configurations that can be dynamically updated. */
/** Configuration applies to the specific type of RPCs. */
private static final class RpcConfig {
private final List<RpcType> rpcTypes;
private final EnumMap<RpcType, Metadata> metadata;
private final RpcType rpcType;
private final Metadata metadata;
private final int timeoutSec;
private RpcConfig(List<RpcType> rpcTypes, EnumMap<RpcType, Metadata> metadata) {
this.rpcTypes = rpcTypes;
private RpcConfig(RpcType rpcType, Metadata metadata, int timeoutSec) {
this.rpcType = rpcType;
this.metadata = metadata;
this.timeoutSec = timeoutSec;
}
}
/** Stats recorder for test RPCs. */
@ThreadSafe
private static final class StatsAccumulator {
private final Map<String, Integer> rpcsStartedByMethod = new HashMap<>();
// TODO(chengyuanzhang): delete the following two after corresponding fields deleted in proto.
private final Map<String, Integer> rpcsFailedByMethod = new HashMap<>();
private final Map<String, Integer> rpcsSucceededByMethod = new HashMap<>();
private final Map<String, Map<Integer, Integer>> rpcStatusByMethod = new HashMap<>();
private synchronized void recordRpcStarted(RpcType rpcType) {
String method = getRpcTypeString(rpcType);
int count = rpcsStartedByMethod.containsKey(method) ? rpcsStartedByMethod.get(method) : 0;
rpcsStartedByMethod.put(method, count + 1);
}
private synchronized void recordRpcFinished(RpcType rpcType, Status status) {
String method = getRpcTypeString(rpcType);
if (status.isOk()) {
int count =
rpcsSucceededByMethod.containsKey(method) ? rpcsSucceededByMethod.get(method) : 0;
rpcsSucceededByMethod.put(method, count + 1);
} else {
int count = rpcsFailedByMethod.containsKey(method) ? rpcsFailedByMethod.get(method) : 0;
rpcsFailedByMethod.put(method, count + 1);
}
int statusCode = status.getCode().value();
Map<Integer, Integer> statusCounts = rpcStatusByMethod.get(method);
if (statusCounts == null) {
statusCounts = new HashMap<>();
rpcStatusByMethod.put(method, statusCounts);
}
int count = statusCounts.containsKey(statusCode) ? statusCounts.get(statusCode) : 0;
statusCounts.put(statusCode, count + 1);
}
@SuppressWarnings("deprecation")
private synchronized LoadBalancerAccumulatedStatsResponse getRpcStats() {
LoadBalancerAccumulatedStatsResponse.Builder builder =
LoadBalancerAccumulatedStatsResponse.newBuilder();
builder.putAllNumRpcsStartedByMethod(rpcsStartedByMethod);
builder.putAllNumRpcsSucceededByMethod(rpcsSucceededByMethod);
builder.putAllNumRpcsFailedByMethod(rpcsFailedByMethod);
for (String method : rpcsStartedByMethod.keySet()) {
MethodStats.Builder methodStatsBuilder = MethodStats.newBuilder();
methodStatsBuilder.setRpcsStarted(rpcsStartedByMethod.get(method));
if (rpcStatusByMethod.containsKey(method)) {
methodStatsBuilder.putAllResult(rpcStatusByMethod.get(method));
}
builder.putStatsPerMethod(method, methodStatsBuilder.build());
}
return builder.build();
}
// e.g., RpcType.UNARY_CALL -> "UNARY_CALL"
private static String getRpcTypeString(RpcType rpcType) {
return rpcType.name();
}
}
@ -586,6 +637,7 @@ public final class XdsTestClient {
return builder.build();
}
// e.g., RpcType.UNARY_CALL -> "UnaryCall"
private static String getRpcTypeString(RpcType rpcType) {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rpcType.name());
}

View File

@ -100,6 +100,7 @@ message SimpleResponse {
string username = 2;
// OAuth scope.
string oauth_scope = 3;
// Server ID. This must be unique among different server instances,
// but the same across all RPC's made to a particular server instance.
string server_id = 4;
@ -191,15 +192,14 @@ message LoadBalancerStatsRequest {
}
message LoadBalancerStatsResponse {
// The number of RPCs succeeded for each peer.
map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed.
int32 num_failures = 2;
message RpcsByPeer {
// The number of RPCs succeeded for each peer.
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
}
// The number of RPCs succeeded for each type (UnaryCall or EmptyCall).
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed to record a remote peer.
int32 num_failures = 2;
map<string, RpcsByPeer> rpcs_by_method = 3;
}
@ -209,11 +209,27 @@ message LoadBalancerAccumulatedStatsRequest {}
// Accumulated stats for RPCs sent by a test client.
message LoadBalancerAccumulatedStatsResponse {
// The total number of RPCs have ever issued for each type.
map<string, int32> num_rpcs_started_by_method = 1;
// Deprecated: use stats_per_method.rpcs_started instead.
map<string, int32> num_rpcs_started_by_method = 1 [deprecated = true];
// The total number of RPCs have ever completed successfully for each type.
map<string, int32> num_rpcs_succeeded_by_method = 2;
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_succeeded_by_method = 2 [deprecated = true];
// The total number of RPCs have ever failed for each type.
map<string, int32> num_rpcs_failed_by_method = 3;
// Deprecated: use stats_per_method.result instead.
map<string, int32> num_rpcs_failed_by_method = 3 [deprecated = true];
message MethodStats {
// The number of RPCs that were started for this method.
int32 rpcs_started = 1;
// The number of RPCs that completed with each status for this method. The
// key is the integral value of a google.rpc.Code; the value is the count.
map<int32, int32> result = 2;
}
// Per-method RPC statistics. The key is the RpcType in string form; e.g.
// 'EMPTY_CALL' or 'UNARY_CALL'
map<string, MethodStats> stats_per_method = 4;
}
// Configurations for a test client.
@ -235,6 +251,9 @@ message ClientConfigureRequest {
repeated RpcType types = 1;
// The collection of custom metadata to be attached to RPCs sent by the client.
repeated Metadata metadata = 2;
// The deadline to use, in seconds, for all RPCs. If unset or zero, the
// client will use the default from the command-line.
int32 timeout_sec = 3;
}
// Response for updating a test client's configuration.