From 8e04df99f359a69c33ef5c83c7f9e9b418916f66 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 3 Nov 2020 10:33:41 -0800 Subject: [PATCH] interop-testing: support dynamic configuration and accumulated stats for xDS test client (#7549) --- .../testing/integration/XdsTestClient.java | 152 ++++++++++++------ 1 file changed, 106 insertions(+), 46 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java index 73af15981e..aba91dcae9 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestClient.java @@ -40,6 +40,11 @@ import io.grpc.Server; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.StreamObserver; +import io.grpc.testing.integration.Messages.ClientConfigureRequest; +import io.grpc.testing.integration.Messages.ClientConfigureRequest.RpcType; +import io.grpc.testing.integration.Messages.ClientConfigureResponse; +import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsRequest; +import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse; import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest; import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse; import io.grpc.testing.integration.Messages.SimpleRequest; @@ -55,6 +60,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,12 +73,14 @@ public final class XdsTestClient { private final Set watchers = new HashSet<>(); private final Object lock = new Object(); private final List channels = new ArrayList<>(); + private final AtomicInteger rpcsStarted = new AtomicInteger(); + private final AtomicInteger rpcsFailed = new AtomicInteger(); + private final AtomicInteger rpcsSucceeded = new AtomicInteger(); private int numChannels = 1; private boolean printResponse = false; private int qps = 1; - private List rpcTypes = ImmutableList.of(RpcType.UNARY_CALL); - private EnumMap metadata = new EnumMap<>(RpcType.class); + private volatile RpcConfig rpcConfig; private int rpcTimeoutSec = 20; private String server = "localhost:8080"; private int statsPort = 8081; @@ -80,15 +88,6 @@ public final class XdsTestClient { private long currentRequestId; private ListeningScheduledExecutorService exec; - private enum RpcType { - EMPTY_CALL, - UNARY_CALL; - - public String toCamelCase() { - return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, toString()); - } - } - /** * The main application allowing this client to be launched from the command line. */ @@ -113,6 +112,8 @@ public final class XdsTestClient { private void parseArgs(String[] args) { boolean usage = false; + List rpcTypes = ImmutableList.of(RpcType.UNARY_CALL); + EnumMap metadata = new EnumMap<>(RpcType.class); for (String arg : args) { if (!arg.startsWith("--")) { System.err.println("All arguments must start with '--': " + arg); @@ -153,6 +154,7 @@ public final class XdsTestClient { break; } } + rpcConfig = new RpcConfig(rpcTypes, metadata); if (usage) { XdsTestClient c = new XdsTestClient(); @@ -167,8 +169,10 @@ public final class XdsTestClient { + c.qps + "\n --rpc=STR Types of RPCs to make, ',' separated string. RPCs can " + "be EmptyCall or UnaryCall. Default: UnaryCall" + + "\n[deprecated] Use XdsUpdateClientConfigureService" + "\n --metadata=STR The metadata to send with each RPC, in the format " + "EmptyCall:key1:value1,UnaryCall:key2:value2." + + "\n[deprecated] Use XdsUpdateClientConfigureService" + "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: " + c.rpcTimeoutSec + "\n --server=host:port Address of server. Default: " @@ -220,7 +224,11 @@ public final class XdsTestClient { } private void run() { - statsServer = NettyServerBuilder.forPort(statsPort).addService(new XdsStatsImpl()).build(); + statsServer = + NettyServerBuilder.forPort(statsPort) + .addService(new XdsStatsImpl()) + .addService(new ConfigureUpdateServiceImpl()) + .build(); try { statsServer.start(); for (int i = 0; i < numChannels; i++) { @@ -253,14 +261,20 @@ public final class XdsTestClient { private void runQps() throws InterruptedException, ExecutionException { final SettableFuture failure = SettableFuture.create(); final class PeriodicRpc implements Runnable { - private final RpcType rpcType; - - private PeriodicRpc(RpcType rpcType) { - this.rpcType = rpcType; - } @Override public void run() { + RpcConfig config = rpcConfig; + for (RpcType type : config.rpcTypes) { + Metadata headers = config.metadata.get(type); + if (headers == null) { + headers = new Metadata(); + } + makeRpc(type, headers); + } + } + + private void makeRpc(final RpcType rpcType, final Metadata headersToSend) { final long requestId; final Set savedWatchers = new HashSet<>(); synchronized (lock) { @@ -269,12 +283,6 @@ public final class XdsTestClient { savedWatchers.addAll(watchers); } - final Metadata headersToSend; - if (metadata.containsKey(rpcType)) { - headersToSend = metadata.get(rpcType); - } else { - headersToSend = new Metadata(); - } ManagedChannel channel = channels.get((int) (requestId % channels.size())); TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel); final AtomicReference> clientCallRef = new AtomicReference<>(); @@ -314,17 +322,18 @@ public final class XdsTestClient { new StreamObserver() { @Override public void onCompleted() { - notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); + handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers); } @Override public void onError(Throwable t) { - notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); + handleRpcError(requestId, rpcType, hostnameRef.get(), savedWatchers); } @Override public void onNext(EmptyProtos.Empty response) {} }); + rpcsStarted.getAndIncrement(); } else if (rpcType == RpcType.UNARY_CALL) { SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build(); stub.unaryCall( @@ -332,7 +341,7 @@ public final class XdsTestClient { new StreamObserver() { @Override public void onCompleted() { - notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); + handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers); } @Override @@ -340,7 +349,7 @@ public final class XdsTestClient { if (printResponse) { logger.log(Level.WARNING, "Rpc failed: {0}", t); } - notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); + handleRpcError(requestId, rpcType, hostnameRef.get(), savedWatchers); } @Override @@ -364,31 +373,39 @@ public final class XdsTestClient { } } }); + rpcsStarted.getAndIncrement(); } } + + private void handleRpcCompleted(long requestId, RpcType rpcType, String hostname, + Set watchers) { + rpcsSucceeded.getAndIncrement(); + notifyWatchers(watchers, rpcType, requestId, hostname); + } + + private void handleRpcError(long requestId, RpcType rpcType, String hostname, + Set watchers) { + rpcsFailed.getAndIncrement(); + notifyWatchers(watchers, rpcType, requestId, hostname); + } } long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps; + ListenableScheduledFuture future = + exec.scheduleAtFixedRate(new PeriodicRpc(), 0, nanosPerQuery, TimeUnit.NANOSECONDS); + Futures.addCallback( + future, + new FutureCallback() { - for (RpcType rpcType : rpcTypes) { - ListenableScheduledFuture future = - exec.scheduleAtFixedRate( - new PeriodicRpc(rpcType), 0, nanosPerQuery, TimeUnit.NANOSECONDS); + @Override + public void onFailure(Throwable t) { + failure.setException(t); + } - Futures.addCallback( - future, - new FutureCallback() { - - @Override - public void onFailure(Throwable t) { - failure.setException(t); - } - - @Override - public void onSuccess(Object o) {} - }, - MoreExecutors.directExecutor()); - } + @Override + public void onSuccess(Object o) {} + }, + MoreExecutors.directExecutor()); failure.get(); } @@ -400,6 +417,22 @@ public final class XdsTestClient { } } + private final class ConfigureUpdateServiceImpl extends + XdsUpdateClientConfigureServiceGrpc.XdsUpdateClientConfigureServiceImplBase { + @Override + public void configure(ClientConfigureRequest request, + StreamObserver responseObserver) { + EnumMap newMetadata = new EnumMap<>(RpcType.class); + for (ClientConfigureRequest.Metadata metadata : request.getMetadataList()) { + Metadata md = new Metadata(); + md.put(Metadata.Key.of(metadata.getKey(), Metadata.ASCII_STRING_MARSHALLER), + metadata.getValue()); + newMetadata.put(metadata.getType(), md); + } + rpcConfig = new RpcConfig(request.getTypesList(), newMetadata); + } + } + private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase { @Override public void getClientStats( @@ -418,6 +451,29 @@ public final class XdsTestClient { responseObserver.onNext(response); responseObserver.onCompleted(); } + + @Override + public void getClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest request, + StreamObserver responseObserver) { + responseObserver.onNext( + LoadBalancerAccumulatedStatsResponse.newBuilder() + .setNumRpcsStarted(rpcsStarted.get()) + .setNumRpcsSucceeded(rpcsSucceeded.get()) + .setNumRpcsFailed(rpcsFailed.get()) + .build()); + responseObserver.onCompleted(); + } + } + + /** RPC configurations that can be dynamically updated. */ + private static final class RpcConfig { + private final List rpcTypes; + private final EnumMap metadata; + + private RpcConfig(List rpcTypes, EnumMap metadata) { + this.rpcTypes = rpcTypes; + this.metadata = metadata; + } } /** Records the remote peer distribution for a given range of RPCs. */ @@ -484,11 +540,15 @@ public final class XdsTestClient { LoadBalancerStatsResponse.RpcsByPeer.Builder rpcs = LoadBalancerStatsResponse.RpcsByPeer.newBuilder(); rpcs.putAllRpcsByPeer(entry.getValue()); - builder.putRpcsByMethod(entry.getKey().toCamelCase(), rpcs.build()); + builder.putRpcsByMethod(getRpcTypeString(entry.getKey()), rpcs.build()); } builder.setNumFailures(noRemotePeer + (int) latch.getCount()); } return builder.build(); } + + private static String getRpcTypeString(RpcType rpcType) { + return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rpcType.name()); + } } }