diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index e9a9f14d55..10d40cc7e6 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -38,6 +38,7 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; +import io.grpc.Status; import io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.services.ChannelzService; @@ -47,12 +48,14 @@ import io.grpc.testing.integration.Messages.ClientConfigureRequest.RpcType; import io.grpc.testing.integration.Messages.ClientConfigureResponse; import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsRequest; import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse; +import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse.MethodStats; import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest; import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse; import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.xds.XdsChannelCredentials; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; @@ -67,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; /** Client for xDS interop tests. */ public final class XdsTestClient { @@ -75,15 +79,13 @@ public final class XdsTestClient { private final Set watchers = new HashSet<>(); private final Object lock = new Object(); private final List channels = new ArrayList<>(); - private final Map rpcsStartedByMethod = new HashMap<>(); - private final Map rpcsFailedByMethod = new HashMap<>(); - private final Map rpcsSucceededByMethod = new HashMap<>(); + private final StatsAccumulator statsAccumulator = new StatsAccumulator(); private static final int CHANNELZ_MAX_PAGE_SIZE = 100; private int numChannels = 1; private boolean printResponse = false; private int qps = 1; - private volatile RpcConfig rpcConfig; + private volatile List rpcConfigs; private int rpcTimeoutSec = 20; private boolean secureMode = false; private String server = "localhost:8080"; @@ -160,7 +162,15 @@ public final class XdsTestClient { break; } } - rpcConfig = new RpcConfig(rpcTypes, metadata); + List configs = new ArrayList<>(); + for (RpcType type : rpcTypes) { + Metadata md = new Metadata(); + if (metadata.containsKey(type)) { + md = metadata.get(type); + } + configs.add(new RpcConfig(type, md, rpcTimeoutSec)); + } + rpcConfigs = Collections.unmodifiableList(configs); if (usage) { XdsTestClient c = new XdsTestClient(); @@ -280,17 +290,13 @@ public final class XdsTestClient { @Override public void run() { - RpcConfig config = rpcConfig; - for (RpcType type : config.rpcTypes) { - Metadata headers = config.metadata.get(type); - if (headers == null) { - headers = new Metadata(); - } - makeRpc(type, headers); + List configs = rpcConfigs; + for (RpcConfig cfg : configs) { + makeRpc(cfg); } } - private void makeRpc(final RpcType rpcType, final Metadata headersToSend) { + private void makeRpc(final RpcConfig config) { final long requestId; final Set savedWatchers = new HashSet<>(); synchronized (lock) { @@ -304,7 +310,7 @@ public final class XdsTestClient { final AtomicReference> clientCallRef = new AtomicReference<>(); final AtomicReference hostnameRef = new AtomicReference<>(); stub = - stub.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS) + stub.withDeadlineAfter(config.timeoutSec, TimeUnit.SECONDS) .withInterceptors( new ClientInterceptor() { @Override @@ -317,7 +323,7 @@ public final class XdsTestClient { return new SimpleForwardingClientCall(call) { @Override public void start(Listener responseListener, Metadata headers) { - headers.merge(headersToSend); + headers.merge(config.metadata); super.start( new SimpleForwardingClientCallListener(responseListener) { @Override @@ -332,31 +338,32 @@ public final class XdsTestClient { } }); - if (rpcType == RpcType.EMPTY_CALL) { + if (config.rpcType == RpcType.EMPTY_CALL) { stub.emptyCall( EmptyProtos.Empty.getDefaultInstance(), new StreamObserver() { @Override public void onCompleted() { - handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers); + handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); } @Override public void onError(Throwable t) { - handleRpcError(requestId, rpcType, savedWatchers); + handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), + savedWatchers); } @Override public void onNext(EmptyProtos.Empty response) {} }); - } else if (rpcType == RpcType.UNARY_CALL) { + } else if (config.rpcType == RpcType.UNARY_CALL) { SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build(); stub.unaryCall( request, new StreamObserver() { @Override public void onCompleted() { - handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers); + handleRpcCompleted(requestId, config.rpcType, hostnameRef.get(), savedWatchers); } @Override @@ -364,7 +371,8 @@ public final class XdsTestClient { if (printResponse) { logger.log(Level.WARNING, "Rpc failed: {0}", t); } - handleRpcError(requestId, rpcType, savedWatchers); + handleRpcError(requestId, config.rpcType, Status.fromThrowable(t), + savedWatchers); } @Override @@ -389,37 +397,20 @@ public final class XdsTestClient { } }); } else { - throw new AssertionError("Unknown RPC type: " + rpcType); - } - synchronized (lock) { - Integer startedBase = rpcsStartedByMethod.get(rpcType.name()); - if (startedBase == null) { - startedBase = 0; - } - rpcsStartedByMethod.put(rpcType.name(), startedBase + 1); + throw new AssertionError("Unknown RPC type: " + config.rpcType); } + statsAccumulator.recordRpcStarted(config.rpcType); } private void handleRpcCompleted(long requestId, RpcType rpcType, String hostname, Set watchers) { - synchronized (lock) { - Integer succeededBase = rpcsSucceededByMethod.get(rpcType.name()); - if (succeededBase == null) { - succeededBase = 0; - } - rpcsSucceededByMethod.put(rpcType.name(), succeededBase + 1); - } + statsAccumulator.recordRpcFinished(rpcType, Status.OK); notifyWatchers(watchers, rpcType, requestId, hostname); } - private void handleRpcError(long requestId, RpcType rpcType, Set watchers) { - synchronized (lock) { - Integer failedBase = rpcsFailedByMethod.get(rpcType.name()); - if (failedBase == null) { - failedBase = 0; - } - rpcsFailedByMethod.put(rpcType.name(), failedBase + 1); - } + private void handleRpcError(long requestId, RpcType rpcType, Status status, + Set watchers) { + statsAccumulator.recordRpcFinished(rpcType, status); notifyWatchers(watchers, rpcType, requestId, null); } } @@ -463,7 +454,13 @@ public final class XdsTestClient { metadata.getValue()); newMetadata.put(metadata.getType(), md); } - rpcConfig = new RpcConfig(request.getTypesList(), newMetadata); + List configs = new ArrayList<>(); + for (RpcType type : request.getTypesList()) { + Metadata md = newMetadata.containsKey(type) ? newMetadata.get(type) : new Metadata(); + int timeout = request.getTimeoutSec() != 0 ? request.getTimeoutSec() : rpcTimeoutSec; + configs.add(new RpcConfig(type, md, timeout)); + } + rpcConfigs = Collections.unmodifiableList(configs); responseObserver.onNext(ClientConfigureResponse.getDefaultInstance()); responseObserver.onCompleted(); } @@ -491,27 +488,81 @@ public final class XdsTestClient { @Override public void getClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest request, StreamObserver responseObserver) { - LoadBalancerAccumulatedStatsResponse.Builder responseBuilder = - LoadBalancerAccumulatedStatsResponse.newBuilder(); - synchronized (lock) { - responseBuilder - .putAllNumRpcsStartedByMethod(rpcsStartedByMethod) - .putAllNumRpcsSucceededByMethod(rpcsSucceededByMethod) - .putAllNumRpcsFailedByMethod(rpcsFailedByMethod); - } - responseObserver.onNext(responseBuilder.build()); + responseObserver.onNext(statsAccumulator.getRpcStats()); responseObserver.onCompleted(); } } - /** RPC configurations that can be dynamically updated. */ + /** Configuration applies to the specific type of RPCs. */ private static final class RpcConfig { - private final List rpcTypes; - private final EnumMap metadata; + private final RpcType rpcType; + private final Metadata metadata; + private final int timeoutSec; - private RpcConfig(List rpcTypes, EnumMap metadata) { - this.rpcTypes = rpcTypes; + private RpcConfig(RpcType rpcType, Metadata metadata, int timeoutSec) { + this.rpcType = rpcType; this.metadata = metadata; + this.timeoutSec = timeoutSec; + } + } + + /** Stats recorder for test RPCs. */ + @ThreadSafe + private static final class StatsAccumulator { + private final Map rpcsStartedByMethod = new HashMap<>(); + // TODO(chengyuanzhang): delete the following two after corresponding fields deleted in proto. + private final Map rpcsFailedByMethod = new HashMap<>(); + private final Map rpcsSucceededByMethod = new HashMap<>(); + private final Map> rpcStatusByMethod = new HashMap<>(); + + private synchronized void recordRpcStarted(RpcType rpcType) { + String method = getRpcTypeString(rpcType); + int count = rpcsStartedByMethod.containsKey(method) ? rpcsStartedByMethod.get(method) : 0; + rpcsStartedByMethod.put(method, count + 1); + } + + private synchronized void recordRpcFinished(RpcType rpcType, Status status) { + String method = getRpcTypeString(rpcType); + if (status.isOk()) { + int count = + rpcsSucceededByMethod.containsKey(method) ? rpcsSucceededByMethod.get(method) : 0; + rpcsSucceededByMethod.put(method, count + 1); + } else { + int count = rpcsFailedByMethod.containsKey(method) ? rpcsFailedByMethod.get(method) : 0; + rpcsFailedByMethod.put(method, count + 1); + } + int statusCode = status.getCode().value(); + Map statusCounts = rpcStatusByMethod.get(method); + if (statusCounts == null) { + statusCounts = new HashMap<>(); + rpcStatusByMethod.put(method, statusCounts); + } + int count = statusCounts.containsKey(statusCode) ? statusCounts.get(statusCode) : 0; + statusCounts.put(statusCode, count + 1); + } + + @SuppressWarnings("deprecation") + private synchronized LoadBalancerAccumulatedStatsResponse getRpcStats() { + LoadBalancerAccumulatedStatsResponse.Builder builder = + LoadBalancerAccumulatedStatsResponse.newBuilder(); + builder.putAllNumRpcsStartedByMethod(rpcsStartedByMethod); + builder.putAllNumRpcsSucceededByMethod(rpcsSucceededByMethod); + builder.putAllNumRpcsFailedByMethod(rpcsFailedByMethod); + + for (String method : rpcsStartedByMethod.keySet()) { + MethodStats.Builder methodStatsBuilder = MethodStats.newBuilder(); + methodStatsBuilder.setRpcsStarted(rpcsStartedByMethod.get(method)); + if (rpcStatusByMethod.containsKey(method)) { + methodStatsBuilder.putAllResult(rpcStatusByMethod.get(method)); + } + builder.putStatsPerMethod(method, methodStatsBuilder.build()); + } + return builder.build(); + } + + // e.g., RpcType.UNARY_CALL -> "UNARY_CALL" + private static String getRpcTypeString(RpcType rpcType) { + return rpcType.name(); } } @@ -586,6 +637,7 @@ public final class XdsTestClient { return builder.build(); } + // e.g., RpcType.UNARY_CALL -> "UnaryCall" private static String getRpcTypeString(RpcType rpcType) { return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rpcType.name()); } diff --git a/interop-testing/src/main/proto/grpc/testing/messages.proto b/interop-testing/src/main/proto/grpc/testing/messages.proto index 81724a5062..9ac53cd89e 100644 --- a/interop-testing/src/main/proto/grpc/testing/messages.proto +++ b/interop-testing/src/main/proto/grpc/testing/messages.proto @@ -100,6 +100,7 @@ message SimpleResponse { string username = 2; // OAuth scope. string oauth_scope = 3; + // Server ID. This must be unique among different server instances, // but the same across all RPC's made to a particular server instance. string server_id = 4; @@ -191,15 +192,14 @@ message LoadBalancerStatsRequest { } message LoadBalancerStatsResponse { - // The number of RPCs succeeded for each peer. - map rpcs_by_peer = 1; - // The number of RPCs that failed. - int32 num_failures = 2; message RpcsByPeer { - // The number of RPCs succeeded for each peer. + // The number of completed RPCs for each peer. map rpcs_by_peer = 1; } - // The number of RPCs succeeded for each type (UnaryCall or EmptyCall). + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; map rpcs_by_method = 3; } @@ -209,11 +209,27 @@ message LoadBalancerAccumulatedStatsRequest {} // Accumulated stats for RPCs sent by a test client. message LoadBalancerAccumulatedStatsResponse { // The total number of RPCs have ever issued for each type. - map num_rpcs_started_by_method = 1; + // Deprecated: use stats_per_method.rpcs_started instead. + map num_rpcs_started_by_method = 1 [deprecated = true]; // The total number of RPCs have ever completed successfully for each type. - map num_rpcs_succeeded_by_method = 2; + // Deprecated: use stats_per_method.result instead. + map num_rpcs_succeeded_by_method = 2 [deprecated = true]; // The total number of RPCs have ever failed for each type. - map num_rpcs_failed_by_method = 3; + // Deprecated: use stats_per_method.result instead. + map num_rpcs_failed_by_method = 3 [deprecated = true]; + + message MethodStats { + // The number of RPCs that were started for this method. + int32 rpcs_started = 1; + + // The number of RPCs that completed with each status for this method. The + // key is the integral value of a google.rpc.Code; the value is the count. + map result = 2; + } + + // Per-method RPC statistics. The key is the RpcType in string form; e.g. + // 'EMPTY_CALL' or 'UNARY_CALL' + map stats_per_method = 4; } // Configurations for a test client. @@ -235,6 +251,9 @@ message ClientConfigureRequest { repeated RpcType types = 1; // The collection of custom metadata to be attached to RPCs sent by the client. repeated Metadata metadata = 2; + // The deadline to use, in seconds, for all RPCs. If unset or zero, the + // client will use the default from the command-line. + int32 timeout_sec = 3; } // Response for updating a test client's configuration.