interop-testing: support dynamic configuration and accumulated stats for xDS test client (#7549)

This commit is contained in:
Chengyuan Zhang 2020-11-03 10:33:41 -08:00 committed by GitHub
parent b2bf5fa7f5
commit 8e04df99f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 106 additions and 46 deletions

View File

@ -40,6 +40,11 @@ import io.grpc.Server;
import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ClientConfigureRequest;
import io.grpc.testing.integration.Messages.ClientConfigureRequest.RpcType;
import io.grpc.testing.integration.Messages.ClientConfigureResponse;
import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerAccumulatedStatsResponse;
import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest; import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse; import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse;
import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleRequest;
@ -55,6 +60,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -67,12 +73,14 @@ public final class XdsTestClient {
private final Set<XdsStatsWatcher> watchers = new HashSet<>(); private final Set<XdsStatsWatcher> watchers = new HashSet<>();
private final Object lock = new Object(); private final Object lock = new Object();
private final List<ManagedChannel> channels = new ArrayList<>(); private final List<ManagedChannel> channels = new ArrayList<>();
private final AtomicInteger rpcsStarted = new AtomicInteger();
private final AtomicInteger rpcsFailed = new AtomicInteger();
private final AtomicInteger rpcsSucceeded = new AtomicInteger();
private int numChannels = 1; private int numChannels = 1;
private boolean printResponse = false; private boolean printResponse = false;
private int qps = 1; private int qps = 1;
private List<RpcType> rpcTypes = ImmutableList.of(RpcType.UNARY_CALL); private volatile RpcConfig rpcConfig;
private EnumMap<RpcType, Metadata> metadata = new EnumMap<>(RpcType.class);
private int rpcTimeoutSec = 20; private int rpcTimeoutSec = 20;
private String server = "localhost:8080"; private String server = "localhost:8080";
private int statsPort = 8081; private int statsPort = 8081;
@ -80,15 +88,6 @@ public final class XdsTestClient {
private long currentRequestId; private long currentRequestId;
private ListeningScheduledExecutorService exec; private ListeningScheduledExecutorService exec;
private enum RpcType {
EMPTY_CALL,
UNARY_CALL;
public String toCamelCase() {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, toString());
}
}
/** /**
* The main application allowing this client to be launched from the command line. * The main application allowing this client to be launched from the command line.
*/ */
@ -113,6 +112,8 @@ public final class XdsTestClient {
private void parseArgs(String[] args) { private void parseArgs(String[] args) {
boolean usage = false; boolean usage = false;
List<RpcType> rpcTypes = ImmutableList.of(RpcType.UNARY_CALL);
EnumMap<RpcType, Metadata> metadata = new EnumMap<>(RpcType.class);
for (String arg : args) { for (String arg : args) {
if (!arg.startsWith("--")) { if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg); System.err.println("All arguments must start with '--': " + arg);
@ -153,6 +154,7 @@ public final class XdsTestClient {
break; break;
} }
} }
rpcConfig = new RpcConfig(rpcTypes, metadata);
if (usage) { if (usage) {
XdsTestClient c = new XdsTestClient(); XdsTestClient c = new XdsTestClient();
@ -167,8 +169,10 @@ public final class XdsTestClient {
+ c.qps + c.qps
+ "\n --rpc=STR Types of RPCs to make, ',' separated string. RPCs can " + "\n --rpc=STR Types of RPCs to make, ',' separated string. RPCs can "
+ "be EmptyCall or UnaryCall. Default: UnaryCall" + "be EmptyCall or UnaryCall. Default: UnaryCall"
+ "\n[deprecated] Use XdsUpdateClientConfigureService"
+ "\n --metadata=STR The metadata to send with each RPC, in the format " + "\n --metadata=STR The metadata to send with each RPC, in the format "
+ "EmptyCall:key1:value1,UnaryCall:key2:value2." + "EmptyCall:key1:value1,UnaryCall:key2:value2."
+ "\n[deprecated] Use XdsUpdateClientConfigureService"
+ "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: " + "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: "
+ c.rpcTimeoutSec + c.rpcTimeoutSec
+ "\n --server=host:port Address of server. Default: " + "\n --server=host:port Address of server. Default: "
@ -220,7 +224,11 @@ public final class XdsTestClient {
} }
private void run() { private void run() {
statsServer = NettyServerBuilder.forPort(statsPort).addService(new XdsStatsImpl()).build(); statsServer =
NettyServerBuilder.forPort(statsPort)
.addService(new XdsStatsImpl())
.addService(new ConfigureUpdateServiceImpl())
.build();
try { try {
statsServer.start(); statsServer.start();
for (int i = 0; i < numChannels; i++) { for (int i = 0; i < numChannels; i++) {
@ -253,14 +261,20 @@ public final class XdsTestClient {
private void runQps() throws InterruptedException, ExecutionException { private void runQps() throws InterruptedException, ExecutionException {
final SettableFuture<Void> failure = SettableFuture.create(); final SettableFuture<Void> failure = SettableFuture.create();
final class PeriodicRpc implements Runnable { final class PeriodicRpc implements Runnable {
private final RpcType rpcType;
private PeriodicRpc(RpcType rpcType) {
this.rpcType = rpcType;
}
@Override @Override
public void run() { public void run() {
RpcConfig config = rpcConfig;
for (RpcType type : config.rpcTypes) {
Metadata headers = config.metadata.get(type);
if (headers == null) {
headers = new Metadata();
}
makeRpc(type, headers);
}
}
private void makeRpc(final RpcType rpcType, final Metadata headersToSend) {
final long requestId; final long requestId;
final Set<XdsStatsWatcher> savedWatchers = new HashSet<>(); final Set<XdsStatsWatcher> savedWatchers = new HashSet<>();
synchronized (lock) { synchronized (lock) {
@ -269,12 +283,6 @@ public final class XdsTestClient {
savedWatchers.addAll(watchers); savedWatchers.addAll(watchers);
} }
final Metadata headersToSend;
if (metadata.containsKey(rpcType)) {
headersToSend = metadata.get(rpcType);
} else {
headersToSend = new Metadata();
}
ManagedChannel channel = channels.get((int) (requestId % channels.size())); ManagedChannel channel = channels.get((int) (requestId % channels.size()));
TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel); TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
final AtomicReference<ClientCall<?, ?>> clientCallRef = new AtomicReference<>(); final AtomicReference<ClientCall<?, ?>> clientCallRef = new AtomicReference<>();
@ -314,17 +322,18 @@ public final class XdsTestClient {
new StreamObserver<EmptyProtos.Empty>() { new StreamObserver<EmptyProtos.Empty>() {
@Override @Override
public void onCompleted() { public void onCompleted() {
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers);
} }
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); handleRpcError(requestId, rpcType, hostnameRef.get(), savedWatchers);
} }
@Override @Override
public void onNext(EmptyProtos.Empty response) {} public void onNext(EmptyProtos.Empty response) {}
}); });
rpcsStarted.getAndIncrement();
} else if (rpcType == RpcType.UNARY_CALL) { } else if (rpcType == RpcType.UNARY_CALL) {
SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build(); SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build();
stub.unaryCall( stub.unaryCall(
@ -332,7 +341,7 @@ public final class XdsTestClient {
new StreamObserver<SimpleResponse>() { new StreamObserver<SimpleResponse>() {
@Override @Override
public void onCompleted() { public void onCompleted() {
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); handleRpcCompleted(requestId, rpcType, hostnameRef.get(), savedWatchers);
} }
@Override @Override
@ -340,7 +349,7 @@ public final class XdsTestClient {
if (printResponse) { if (printResponse) {
logger.log(Level.WARNING, "Rpc failed: {0}", t); logger.log(Level.WARNING, "Rpc failed: {0}", t);
} }
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get()); handleRpcError(requestId, rpcType, hostnameRef.get(), savedWatchers);
} }
@Override @Override
@ -364,31 +373,39 @@ public final class XdsTestClient {
} }
} }
}); });
rpcsStarted.getAndIncrement();
} }
} }
private void handleRpcCompleted(long requestId, RpcType rpcType, String hostname,
Set<XdsStatsWatcher> watchers) {
rpcsSucceeded.getAndIncrement();
notifyWatchers(watchers, rpcType, requestId, hostname);
}
private void handleRpcError(long requestId, RpcType rpcType, String hostname,
Set<XdsStatsWatcher> watchers) {
rpcsFailed.getAndIncrement();
notifyWatchers(watchers, rpcType, requestId, hostname);
}
} }
long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps; long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps;
ListenableScheduledFuture<?> future =
exec.scheduleAtFixedRate(new PeriodicRpc(), 0, nanosPerQuery, TimeUnit.NANOSECONDS);
Futures.addCallback(
future,
new FutureCallback<Object>() {
for (RpcType rpcType : rpcTypes) { @Override
ListenableScheduledFuture<?> future = public void onFailure(Throwable t) {
exec.scheduleAtFixedRate( failure.setException(t);
new PeriodicRpc(rpcType), 0, nanosPerQuery, TimeUnit.NANOSECONDS); }
Futures.addCallback( @Override
future, public void onSuccess(Object o) {}
new FutureCallback<Object>() { },
MoreExecutors.directExecutor());
@Override
public void onFailure(Throwable t) {
failure.setException(t);
}
@Override
public void onSuccess(Object o) {}
},
MoreExecutors.directExecutor());
}
failure.get(); failure.get();
} }
@ -400,6 +417,22 @@ public final class XdsTestClient {
} }
} }
private final class ConfigureUpdateServiceImpl extends
XdsUpdateClientConfigureServiceGrpc.XdsUpdateClientConfigureServiceImplBase {
@Override
public void configure(ClientConfigureRequest request,
StreamObserver<ClientConfigureResponse> responseObserver) {
EnumMap<RpcType, Metadata> newMetadata = new EnumMap<>(RpcType.class);
for (ClientConfigureRequest.Metadata metadata : request.getMetadataList()) {
Metadata md = new Metadata();
md.put(Metadata.Key.of(metadata.getKey(), Metadata.ASCII_STRING_MARSHALLER),
metadata.getValue());
newMetadata.put(metadata.getType(), md);
}
rpcConfig = new RpcConfig(request.getTypesList(), newMetadata);
}
}
private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase { private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase {
@Override @Override
public void getClientStats( public void getClientStats(
@ -418,6 +451,29 @@ public final class XdsTestClient {
responseObserver.onNext(response); responseObserver.onNext(response);
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
@Override
public void getClientAccumulatedStats(LoadBalancerAccumulatedStatsRequest request,
StreamObserver<LoadBalancerAccumulatedStatsResponse> responseObserver) {
responseObserver.onNext(
LoadBalancerAccumulatedStatsResponse.newBuilder()
.setNumRpcsStarted(rpcsStarted.get())
.setNumRpcsSucceeded(rpcsSucceeded.get())
.setNumRpcsFailed(rpcsFailed.get())
.build());
responseObserver.onCompleted();
}
}
/** RPC configurations that can be dynamically updated. */
private static final class RpcConfig {
private final List<RpcType> rpcTypes;
private final EnumMap<RpcType, Metadata> metadata;
private RpcConfig(List<RpcType> rpcTypes, EnumMap<RpcType, Metadata> metadata) {
this.rpcTypes = rpcTypes;
this.metadata = metadata;
}
} }
/** Records the remote peer distribution for a given range of RPCs. */ /** Records the remote peer distribution for a given range of RPCs. */
@ -484,11 +540,15 @@ public final class XdsTestClient {
LoadBalancerStatsResponse.RpcsByPeer.Builder rpcs = LoadBalancerStatsResponse.RpcsByPeer.Builder rpcs =
LoadBalancerStatsResponse.RpcsByPeer.newBuilder(); LoadBalancerStatsResponse.RpcsByPeer.newBuilder();
rpcs.putAllRpcsByPeer(entry.getValue()); rpcs.putAllRpcsByPeer(entry.getValue());
builder.putRpcsByMethod(entry.getKey().toCamelCase(), rpcs.build()); builder.putRpcsByMethod(getRpcTypeString(entry.getKey()), rpcs.build());
} }
builder.setNumFailures(noRemotePeer + (int) latch.getCount()); builder.setNumFailures(noRemotePeer + (int) latch.getCount());
} }
return builder.build(); return builder.build();
} }
private static String getRpcTypeString(RpcType rpcType) {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, rpcType.name());
}
} }
} }