diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 6f2d9e9edd..47cde98969 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -60,7 +60,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -73,9 +72,9 @@ public final class XdsTestClient { private final Set watchers = new HashSet<>(); private final Object lock = new Object(); private final List channels = new ArrayList<>(); - private final AtomicInteger rpcsStarted = new AtomicInteger(); - private final AtomicInteger rpcsFailed = new AtomicInteger(); - private final AtomicInteger rpcsSucceeded = new AtomicInteger(); + private final Map rpcsStartedByMethod = new HashMap<>(); + private final Map rpcsFailedByMethod = new HashMap<>(); + private final Map rpcsSucceededByMethod = new HashMap<>(); private int numChannels = 1; private boolean printResponse = false; @@ -333,7 +332,6 @@ public final class XdsTestClient { @Override public void onNext(EmptyProtos.Empty response) {} }); - rpcsStarted.getAndIncrement(); } else if (rpcType == RpcType.UNARY_CALL) { SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build(); stub.unaryCall( @@ -373,19 +371,39 @@ public final class XdsTestClient { } } }); - rpcsStarted.getAndIncrement(); + } else { + throw new AssertionError("Unknown RPC type: " + rpcType); + } + synchronized (lock) { + Integer startedBase = rpcsStartedByMethod.get(rpcType.name()); + if (startedBase == null) { + startedBase = 0; + } + rpcsStartedByMethod.put(rpcType.name(), startedBase + 1); } } private void handleRpcCompleted(long requestId, RpcType rpcType, String hostname, Set watchers) { - rpcsSucceeded.getAndIncrement(); + synchronized (lock) { + Integer succeededBase = rpcsSucceededByMethod.get(rpcType.name()); + if (succeededBase == null) { + succeededBase = 0; + } + rpcsSucceededByMethod.put(rpcType.name(), succeededBase + 1); + } notifyWatchers(watchers, rpcType, requestId, hostname); } private void handleRpcError(long requestId, RpcType rpcType, String hostname, Set watchers) { - rpcsFailed.getAndIncrement(); + synchronized (lock) { + Integer failedBase = rpcsFailedByMethod.get(rpcType.name()); + if (failedBase == null) { + failedBase = 0; + } + rpcsFailedByMethod.put(rpcType.name(), failedBase + 1); + } notifyWatchers(watchers, rpcType, requestId, hostname); } } @@ -457,12 +475,15 @@ public final class XdsTestClient { @Override public void getClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest request, StreamObserver responseObserver) { - responseObserver.onNext( - LoadBalancerAccumulatedStatsResponse.newBuilder() - .setNumRpcsStarted(rpcsStarted.get()) - .setNumRpcsSucceeded(rpcsSucceeded.get()) - .setNumRpcsFailed(rpcsFailed.get()) - .build()); + LoadBalancerAccumulatedStatsResponse.Builder responseBuilder = + LoadBalancerAccumulatedStatsResponse.newBuilder(); + synchronized (lock) { + responseBuilder + .putAllNumRpcsStartedByMethod(rpcsStartedByMethod) + .putAllNumRpcsSucceededByMethod(rpcsSucceededByMethod) + .putAllNumRpcsFailedByMethod(rpcsFailedByMethod); + } + responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } } diff --git a/interop-testing/src/main/proto/grpc/testing/messages.proto b/interop-testing/src/main/proto/grpc/testing/messages.proto index d175057f02..797adb72d6 100644 --- a/interop-testing/src/main/proto/grpc/testing/messages.proto +++ b/interop-testing/src/main/proto/grpc/testing/messages.proto @@ -208,12 +208,12 @@ message LoadBalancerAccumulatedStatsRequest {} // Accumulated stats for RPCs sent by a test client. message LoadBalancerAccumulatedStatsResponse { - // The total number of RPCs have ever issued. - int32 num_rpcs_started = 1; - // The total number of RPCs have ever completed successfully. - int32 num_rpcs_succeeded = 2; - // The total number of RPCs have ever failed. - int32 num_rpcs_failed = 3; + // The total number of RPCs have ever issued for each type. + map num_rpcs_started_by_method = 1; + // The total number of RPCs have ever completed successfully for each type. + map num_rpcs_succeeded_by_method = 2; + // The total number of RPCs have ever failed for each type. + map num_rpcs_failed_by_method = 3; } // Configurations for a test client.