interop-testing: add flags to xds test client

This commit is contained in:
Eric Gribkoff 2020-07-28 12:43:00 -07:00 committed by GitHub
parent f444b7bcc0
commit 4fbe6bef7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 277 additions and 60 deletions

View File

@ -20,9 +20,15 @@ popd
git clone -b "${branch}" --single-branch --depth=1 https://github.com/grpc/grpc.git git clone -b "${branch}" --single-branch --depth=1 https://github.com/grpc/grpc.git
grpc/tools/run_tests/helper_scripts/prep_xds.sh grpc/tools/run_tests/helper_scripts/prep_xds.sh
# Test cases "path_matching" and "header_matching" are not included in "all",
# because not all interop clients in all languages support these new tests.
#
# TODO(ericgribkoff): remove "path_matching" and "header_matching" from
# --test_case after they are added into "all".
JAVA_OPTS=-Djava.util.logging.config.file=grpc-java/buildscripts/xds_logging.properties \ JAVA_OPTS=-Djava.util.logging.config.file=grpc-java/buildscripts/xds_logging.properties \
python3 grpc/tools/run_tests/run_xds_tests.py \ python3 grpc/tools/run_tests/run_xds_tests.py \
--test_case=all \ --test_case="all,path_matching,header_matching" \
--project_id=grpc-testing \ --project_id=grpc-testing \
--source_image=projects/grpc-testing/global/images/xds-test-server \ --source_image=projects/grpc-testing/global/images/xds-test-server \
--path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \ --path_to_server_binary=/java_server/grpc-java/interop-testing/build/install/grpc-interop-testing/bin/xds-test-server \

View File

@ -16,6 +16,9 @@
package io.grpc.testing.integration; package io.grpc.testing.integration;
import com.google.common.base.CaseFormat;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@ -24,12 +27,16 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.Status;
import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
@ -38,6 +45,7 @@ import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse;
import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.testing.integration.Messages.SimpleResponse;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -47,6 +55,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -62,13 +71,24 @@ public final class XdsTestClient {
private int numChannels = 1; private int numChannels = 1;
private boolean printResponse = false; private boolean printResponse = false;
private int qps = 1; private int qps = 1;
private int rpcTimeoutSec = 2; private List<RpcType> rpcTypes = ImmutableList.of(RpcType.UNARY_CALL);
private EnumMap<RpcType, Metadata> metadata = new EnumMap<>(RpcType.class);
private int rpcTimeoutSec = 20;
private String server = "localhost:8080"; private String server = "localhost:8080";
private int statsPort = 8081; private int statsPort = 8081;
private Server statsServer; private Server statsServer;
private long currentRequestId; private long currentRequestId;
private ListeningScheduledExecutorService exec; private ListeningScheduledExecutorService exec;
private enum RpcType {
EMPTY_CALL,
UNARY_CALL;
public String toCamelCase() {
return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, toString());
}
}
/** /**
* The main application allowing this client to be launched from the command line. * The main application allowing this client to be launched from the command line.
*/ */
@ -111,12 +131,16 @@ public final class XdsTestClient {
break; break;
} }
String value = parts[1]; String value = parts[1];
if ("num_channels".equals(key)) { if ("metadata".equals(key)) {
metadata = parseMetadata(value);
} else if ("num_channels".equals(key)) {
numChannels = Integer.valueOf(value); numChannels = Integer.valueOf(value);
} else if ("print_response".equals(key)) { } else if ("print_response".equals(key)) {
printResponse = Boolean.valueOf(value); printResponse = Boolean.valueOf(value);
} else if ("qps".equals(key)) { } else if ("qps".equals(key)) {
qps = Integer.valueOf(value); qps = Integer.valueOf(value);
} else if ("rpc".equals(key)) {
rpcTypes = parseRpcs(value);
} else if ("rpc_timeout_sec".equals(key)) { } else if ("rpc_timeout_sec".equals(key)) {
rpcTimeoutSec = Integer.valueOf(value); rpcTimeoutSec = Integer.valueOf(value);
} else if ("server".equals(key)) { } else if ("server".equals(key)) {
@ -139,8 +163,12 @@ public final class XdsTestClient {
+ c.numChannels + c.numChannels
+ "\n --print_response=BOOL Write RPC response to stdout. Default: " + "\n --print_response=BOOL Write RPC response to stdout. Default: "
+ c.printResponse + c.printResponse
+ "\n --qps=INT Qps per channel. Default: " + "\n --qps=INT Qps per channel, for each type of RPC. Default: "
+ c.qps + c.qps
+ "\n --rpc=STR Types of RPCs to make, ',' separated string. RPCs can "
+ "be EmptyCall or UnaryCall. Default: UnaryCall"
+ "\n --metadata=STR The metadata to send with each RPC, in the format "
+ "EmptyCall:key1:value1,UnaryCall:key2:value2."
+ "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: " + "\n --rpc_timeout_sec=INT Per RPC timeout seconds. Default: "
+ c.rpcTimeoutSec + c.rpcTimeoutSec
+ "\n --server=host:port Address of server. Default: " + "\n --server=host:port Address of server. Default: "
@ -152,6 +180,45 @@ public final class XdsTestClient {
} }
} }
private static List<RpcType> parseRpcs(String rpcArg) {
List<RpcType> rpcs = new ArrayList<>();
for (String rpc : Splitter.on(',').split(rpcArg)) {
rpcs.add(parseRpc(rpc));
}
return rpcs;
}
private static EnumMap<RpcType, Metadata> parseMetadata(String metadataArg) {
EnumMap<RpcType, Metadata> rpcMetadata = new EnumMap<>(RpcType.class);
for (String metadata : Splitter.on(',').omitEmptyStrings().split(metadataArg)) {
List<String> parts = Splitter.on(':').splitToList(metadata);
if (parts.size() != 3) {
throw new IllegalArgumentException("Invalid metadata: '" + metadata + "'");
}
RpcType rpc = parseRpc(parts.get(0));
String key = parts.get(1);
String value = parts.get(2);
Metadata md = new Metadata();
md.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value);
if (rpcMetadata.containsKey(rpc)) {
rpcMetadata.get(rpc).merge(md);
} else {
rpcMetadata.put(rpc, md);
}
}
return rpcMetadata;
}
private static RpcType parseRpc(String rpc) {
if ("EmptyCall".equals(rpc)) {
return RpcType.EMPTY_CALL;
} else if ("UnaryCall".equals(rpc)) {
return RpcType.UNARY_CALL;
} else {
throw new IllegalArgumentException("Unknown RPC: '" + rpc + "'");
}
}
private void run() { private void run() {
statsServer = NettyServerBuilder.forPort(statsPort).addService(new XdsStatsImpl()).build(); statsServer = NettyServerBuilder.forPort(statsPort).addService(new XdsStatsImpl()).build();
try { try {
@ -186,6 +253,11 @@ public final class XdsTestClient {
private void runQps() throws InterruptedException, ExecutionException { private void runQps() throws InterruptedException, ExecutionException {
final SettableFuture<Void> failure = SettableFuture.create(); final SettableFuture<Void> failure = SettableFuture.create();
final class PeriodicRpc implements Runnable { final class PeriodicRpc implements Runnable {
private final RpcType rpcType;
private PeriodicRpc(RpcType rpcType) {
this.rpcType = rpcType;
}
@Override @Override
public void run() { public void run() {
@ -197,69 +269,137 @@ public final class XdsTestClient {
savedWatchers.addAll(watchers); savedWatchers.addAll(watchers);
} }
SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build(); final Metadata headersToSend;
if (metadata.containsKey(rpcType)) {
headersToSend = metadata.get(rpcType);
} else {
headersToSend = new Metadata();
}
ManagedChannel channel = channels.get((int) (requestId % channels.size())); ManagedChannel channel = channels.get((int) (requestId % channels.size()));
final ClientCall<SimpleRequest, SimpleResponse> call = TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
channel.newCall( final AtomicReference<ClientCall<?, ?>> clientCallRef = new AtomicReference<>();
TestServiceGrpc.getUnaryCallMethod(), final AtomicReference<String> hostnameRef = new AtomicReference<>();
CallOptions.DEFAULT.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS)); stub =
call.start( stub.withDeadlineAfter(rpcTimeoutSec, TimeUnit.SECONDS)
new ClientCall.Listener<SimpleResponse>() { .withInterceptors(
private String hostname; new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
clientCallRef.set(call);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.merge(headersToSend);
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
hostnameRef.set(headers.get(XdsTestServer.HOSTNAME_KEY));
super.onHeaders(headers);
}
},
headers);
}
};
}
});
@Override if (rpcType == RpcType.EMPTY_CALL) {
public void onMessage(SimpleResponse response) { stub.emptyCall(
hostname = response.getHostname(); EmptyProtos.Empty.getDefaultInstance(),
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC new StreamObserver<EmptyProtos.Empty>() {
// service and rely on parsing stdout. @Override
if (printResponse) { public void onCompleted() {
System.out.println( notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get());
"Greeting: Hello world, this is "
+ hostname
+ ", from "
+ call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
} }
}
@Override @Override
public void onClose(Status status, Metadata trailers) { public void onError(Throwable t) {
if (printResponse && !status.isOk()) { notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get());
logger.log(Level.WARNING, "Greeting RPC failed with status {0}", status);
} }
for (XdsStatsWatcher watcher : savedWatchers) {
watcher.rpcCompleted(requestId, hostname);
}
}
},
new Metadata());
call.sendMessage(request); @Override
call.request(1); public void onNext(EmptyProtos.Empty response) {}
call.halfClose(); });
} else if (rpcType == RpcType.UNARY_CALL) {
SimpleRequest request = SimpleRequest.newBuilder().setFillServerId(true).build();
stub.unaryCall(
request,
new StreamObserver<SimpleResponse>() {
@Override
public void onCompleted() {
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get());
}
@Override
public void onError(Throwable t) {
if (printResponse) {
logger.log(Level.WARNING, "Rpc failed: {0}", t);
}
notifyWatchers(savedWatchers, rpcType, requestId, hostnameRef.get());
}
@Override
public void onNext(SimpleResponse response) {
// TODO(ericgribkoff) Currently some test environments cannot access the stats RPC
// service and rely on parsing stdout.
if (printResponse) {
System.out.println(
"Greeting: Hello world, this is "
+ response.getHostname()
+ ", from "
+ clientCallRef
.get()
.getAttributes()
.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
}
// Use the hostname from the response if not present in the metadata.
// TODO(ericgribkoff) Delete when server is deployed that sets metadata value.
if (hostnameRef.get() == null) {
hostnameRef.set(response.getHostname());
}
}
});
}
} }
} }
long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps; long nanosPerQuery = TimeUnit.SECONDS.toNanos(1) / qps;
ListenableScheduledFuture<?> future =
exec.scheduleAtFixedRate(new PeriodicRpc(), 0, nanosPerQuery, TimeUnit.NANOSECONDS);
Futures.addCallback( for (RpcType rpcType : rpcTypes) {
future, ListenableScheduledFuture<?> future =
new FutureCallback<Object>() { exec.scheduleAtFixedRate(
new PeriodicRpc(rpcType), 0, nanosPerQuery, TimeUnit.NANOSECONDS);
@Override Futures.addCallback(
public void onFailure(Throwable t) { future,
failure.setException(t); new FutureCallback<Object>() {
}
@Override @Override
public void onSuccess(Object o) {} public void onFailure(Throwable t) {
}, failure.setException(t);
MoreExecutors.directExecutor()); }
@Override
public void onSuccess(Object o) {}
},
MoreExecutors.directExecutor());
}
failure.get(); failure.get();
} }
private void notifyWatchers(
Set<XdsStatsWatcher> watchers, RpcType rpcType, long requestId, String hostname) {
for (XdsStatsWatcher watcher : watchers) {
watcher.rpcCompleted(rpcType, requestId, hostname);
}
}
private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase { private class XdsStatsImpl extends LoadBalancerStatsServiceGrpc.LoadBalancerStatsServiceImplBase {
@Override @Override
public void getClientStats( public void getClientStats(
@ -286,6 +426,8 @@ public final class XdsTestClient {
private final long startId; private final long startId;
private final long endId; private final long endId;
private final Map<String, Integer> rpcsByPeer = new HashMap<>(); private final Map<String, Integer> rpcsByPeer = new HashMap<>();
private final EnumMap<RpcType, Map<String, Integer>> rpcsByTypeAndPeer =
new EnumMap<>(RpcType.class);
private final Object lock = new Object(); private final Object lock = new Object();
private int noRemotePeer; private int noRemotePeer;
@ -295,7 +437,7 @@ public final class XdsTestClient {
this.endId = endId; this.endId = endId;
} }
void rpcCompleted(long requestId, @Nullable String hostname) { void rpcCompleted(RpcType rpcType, long requestId, @Nullable String hostname) {
synchronized (lock) { synchronized (lock) {
if (startId <= requestId && requestId < endId) { if (startId <= requestId && requestId < endId) {
if (hostname != null) { if (hostname != null) {
@ -304,6 +446,19 @@ public final class XdsTestClient {
} else { } else {
rpcsByPeer.put(hostname, 1); rpcsByPeer.put(hostname, 1);
} }
if (rpcsByTypeAndPeer.containsKey(rpcType)) {
if (rpcsByTypeAndPeer.get(rpcType).containsKey(hostname)) {
rpcsByTypeAndPeer
.get(rpcType)
.put(hostname, rpcsByTypeAndPeer.get(rpcType).get(hostname) + 1);
} else {
rpcsByTypeAndPeer.get(rpcType).put(hostname, 1);
}
} else {
Map<String, Integer> rpcMap = new HashMap<>();
rpcMap.put(hostname, 1);
rpcsByTypeAndPeer.put(rpcType, rpcMap);
}
} else { } else {
noRemotePeer += 1; noRemotePeer += 1;
} }
@ -325,6 +480,12 @@ public final class XdsTestClient {
LoadBalancerStatsResponse.Builder builder = LoadBalancerStatsResponse.newBuilder(); LoadBalancerStatsResponse.Builder builder = LoadBalancerStatsResponse.newBuilder();
synchronized (lock) { synchronized (lock) {
builder.putAllRpcsByPeer(rpcsByPeer); builder.putAllRpcsByPeer(rpcsByPeer);
for (Map.Entry<RpcType, Map<String, Integer>> entry : rpcsByTypeAndPeer.entrySet()) {
LoadBalancerStatsResponse.RpcsByPeer.Builder rpcs =
LoadBalancerStatsResponse.RpcsByPeer.newBuilder();
rpcs.putAllRpcsByPeer(entry.getValue());
builder.putRpcsByMethod(entry.getKey().toCamelCase(), rpcs.build());
}
builder.setNumFailures(noRemotePeer + (int) latch.getCount()); builder.setNumFailures(noRemotePeer + (int) latch.getCount());
} }
return builder.build(); return builder.build();

View File

@ -16,7 +16,13 @@
package io.grpc.testing.integration; package io.grpc.testing.integration;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus; import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.protobuf.services.ProtoReflectionService;
@ -32,12 +38,16 @@ import java.util.logging.Logger;
/** Interop test server that implements the xDS testing service. */ /** Interop test server that implements the xDS testing service. */
public final class XdsTestServer { public final class XdsTestServer {
static final Metadata.Key<String> HOSTNAME_KEY =
Metadata.Key.of("hostname", Metadata.ASCII_STRING_MARSHALLER);
private static Logger logger = Logger.getLogger(XdsTestServer.class.getName()); private static Logger logger = Logger.getLogger(XdsTestServer.class.getName());
private int port = 8080; private int port = 8080;
private String serverId = "java_server"; private String serverId = "java_server";
private HealthStatusManager health; private HealthStatusManager health;
private Server server; private Server server;
private String host;
/** /**
* The main application allowing this client to be launched from the command line. * The main application allowing this client to be launched from the command line.
@ -111,10 +121,18 @@ public final class XdsTestServer {
} }
private void start() throws Exception { private void start() throws Exception {
try {
host = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
logger.log(Level.SEVERE, "Failed to get host", e);
throw new RuntimeException(e);
}
health = new HealthStatusManager(); health = new HealthStatusManager();
server = server =
NettyServerBuilder.forPort(port) NettyServerBuilder.forPort(port)
.addService(new TestServiceImpl(serverId)) .addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new HostnameInterceptor(host)))
.addService(new XdsUpdateHealthServiceImpl(health)) .addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService()) .addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance()) .addService(ProtoReflectionService.newInstance())
@ -140,14 +158,16 @@ public final class XdsTestServer {
private final String serverId; private final String serverId;
private final String host; private final String host;
private TestServiceImpl(String serverId) { private TestServiceImpl(String serverId, String host) {
this.serverId = serverId; this.serverId = serverId;
try { this.host = host;
host = InetAddress.getLocalHost().getHostName(); }
} catch (UnknownHostException e) {
logger.log(Level.SEVERE, "Failed to get host", e); @Override
throw new RuntimeException(e); public void emptyCall(
} EmptyProtos.Empty req, StreamObserver<EmptyProtos.Empty> responseObserver) {
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
responseObserver.onCompleted();
} }
@Override @Override
@ -182,4 +202,28 @@ public final class XdsTestServer {
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
} }
private static class HostnameInterceptor implements ServerInterceptor {
private final String host;
private HostnameInterceptor(String host) {
this.host = host;
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(HOSTNAME_KEY, host);
super.sendHeaders(responseHeaders);
}
},
requestHeaders);
}
}
} }

View File

@ -195,4 +195,10 @@ message LoadBalancerStatsResponse {
map<string, int32> rpcs_by_peer = 1; map<string, int32> rpcs_by_peer = 1;
// The number of RPCs that failed to record a remote peer. // The number of RPCs that failed to record a remote peer.
int32 num_failures = 2; int32 num_failures = 2;
message RpcsByPeer {
// The number of completed RPCs for each peer.
map<string, int32> rpcs_by_peer = 1;
}
// The number of completed RPCs for each type (UnaryCall or EmptyCall).
map<string, RpcsByPeer> rpcs_by_method = 3;
} }