interop: create PSM security xDS interop tests - server & client (#7609)

Co-authored-by: Sergii Tkachenko <hi@sergii.org>
This commit is contained in:
sanjaypujare 2020-11-20 18:31:40 -08:00 committed by GitHub
parent 40b54079ce
commit a7530efd6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 15 deletions

View File

@ -26,6 +26,7 @@ dependencies {
project(':grpc-services'), project(':grpc-services'),
project(':grpc-stub'), project(':grpc-stub'),
project(':grpc-testing'), project(':grpc-testing'),
project(path: ':grpc-xds', configuration: 'shadow'),
libraries.google_auth_oauth2_http, libraries.google_auth_oauth2_http,
libraries.junit, libraries.junit,
libraries.truth libraries.truth
@ -33,8 +34,7 @@ dependencies {
compileOnly libraries.javax_annotation compileOnly libraries.javax_annotation
runtimeOnly libraries.opencensus_impl, runtimeOnly libraries.opencensus_impl,
libraries.netty_tcnative, libraries.netty_tcnative,
project(':grpc-grpclb'), project(':grpc-grpclb')
project(path: ':grpc-xds', configuration: 'shadow')
testImplementation project(':grpc-context').sourceSets.test.output, testImplementation project(':grpc-context').sourceSets.test.output,
libraries.mockito libraries.mockito
alpnagent libraries.jetty_alpn_agent alpnagent libraries.jetty_alpn_agent

View File

@ -33,11 +33,11 @@ import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder; import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ClientConfigureRequest; import io.grpc.testing.integration.Messages.ClientConfigureRequest;
@ -49,6 +49,7 @@ import io.grpc.testing.integration.Messages.LoadBalancerStatsRequest;
import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse; import io.grpc.testing.integration.Messages.LoadBalancerStatsResponse;
import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.xds.XdsChannelCredentials;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
@ -81,6 +82,7 @@ public final class XdsTestClient {
private int qps = 1; private int qps = 1;
private volatile RpcConfig rpcConfig; private volatile RpcConfig rpcConfig;
private int rpcTimeoutSec = 20; private int rpcTimeoutSec = 20;
private boolean secureMode = false;
private String server = "localhost:8080"; private String server = "localhost:8080";
private int statsPort = 8081; private int statsPort = 8081;
private Server statsServer; private Server statsServer;
@ -147,6 +149,8 @@ public final class XdsTestClient {
server = value; server = value;
} else if ("stats_port".equals(key)) { } else if ("stats_port".equals(key)) {
statsPort = Integer.valueOf(value); statsPort = Integer.valueOf(value);
} else if ("secureMode".equals(key)) {
secureMode = Boolean.valueOf(value);
} else { } else {
System.err.println("Unknown argument: " + key); System.err.println("Unknown argument: " + key);
usage = true; usage = true;
@ -176,6 +180,8 @@ public final class XdsTestClient {
+ c.rpcTimeoutSec + c.rpcTimeoutSec
+ "\n --server=host:port Address of server. Default: " + "\n --server=host:port Address of server. Default: "
+ c.server + c.server
+ "\n --secureMode=BOOLEAN Use true to enable XdsCredentials. Default: "
+ c.secureMode
+ "\n --stats_port=INT Port to expose peer distribution stats service. " + "\n --stats_port=INT Port to expose peer distribution stats service. "
+ "Default: " + "Default: "
+ c.statsPort); + c.statsPort);
@ -231,7 +237,13 @@ public final class XdsTestClient {
try { try {
statsServer.start(); statsServer.start();
for (int i = 0; i < numChannels; i++) { for (int i = 0; i < numChannels; i++) {
channels.add(NettyChannelBuilder.forTarget(server).usePlaintext().build()); channels.add(
Grpc.newChannelBuilder(
server,
secureMode
? XdsChannelCredentials.create(InsecureChannelCredentials.create())
: InsecureChannelCredentials.create())
.build());
} }
exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor()); exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor());
runQps(); runQps();

View File

@ -19,6 +19,7 @@ package io.grpc.testing.integration;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Contexts; import io.grpc.Contexts;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.InsecureServerCredentials;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Server; import io.grpc.Server;
import io.grpc.ServerCall; import io.grpc.ServerCall;
@ -32,6 +33,8 @@ import io.grpc.services.HealthStatusManager;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.SimpleRequest; import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse; import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.xds.XdsServerBuilder;
import io.grpc.xds.XdsServerCredentials;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -51,9 +54,12 @@ public final class XdsTestServer {
private static Logger logger = Logger.getLogger(XdsTestServer.class.getName()); private static Logger logger = Logger.getLogger(XdsTestServer.class.getName());
private int port = 8080; private int port = 8080;
private int maintenancePort = 8080;
private boolean secureMode = false;
private String serverId = "java_server"; private String serverId = "java_server";
private HealthStatusManager health; private HealthStatusManager health;
private Server server; private Server server;
private Server maintenanceServer;
private String host; private String host;
/** /**
@ -103,6 +109,10 @@ public final class XdsTestServer {
String value = parts[1]; String value = parts[1];
if ("port".equals(key)) { if ("port".equals(key)) {
port = Integer.valueOf(value); port = Integer.valueOf(value);
} else if ("maintenance_port".equals(key)) {
maintenancePort = Integer.valueOf(value);
} else if ("secure_mode".equals(key)) {
secureMode = Boolean.parseBoolean(value);
} else if ("server_id".equals(key)) { } else if ("server_id".equals(key)) {
serverId = value; serverId = value;
} else { } else {
@ -112,14 +122,30 @@ public final class XdsTestServer {
} }
} }
if (secureMode && (port == maintenancePort)) {
System.err.println(
"port and maintenance_port should be different for secure mode: port="
+ port
+ ", maintenance_port="
+ maintenancePort);
usage = true;
}
if (usage) { if (usage) {
XdsTestServer s = new XdsTestServer(); XdsTestServer s = new XdsTestServer();
System.err.println( System.err.println(
"Usage: [ARGS...]" "Usage: [ARGS...]"
+ "\n" + "\n"
+ "\n --port=INT listening port for server." + "\n --port=INT listening port for test server."
+ "\n Default: " + "\n Default: "
+ s.port + s.port
+ "\n --maintenance_port=INT listening port for other servers."
+ "\n Default: "
+ s.maintenancePort
+ "\n --secure_mode=BOOLEAN Use true to enable XdsCredentials."
+ " port and maintenance_port should be different for secure mode."
+ "\n Default: "
+ s.secureMode
+ "\n --server_id=STRING server ID for response." + "\n --server_id=STRING server ID for response."
+ "\n Default: " + "\n Default: "
+ s.serverId); + s.serverId);
@ -135,30 +161,58 @@ public final class XdsTestServer {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
health = new HealthStatusManager(); health = new HealthStatusManager();
server = if (secureMode) {
NettyServerBuilder.forPort(port) server =
.addService( XdsServerBuilder.forPort(
ServerInterceptors.intercept( port, XdsServerCredentials.create(InsecureServerCredentials.create()))
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host))) .addService(
.addService(new XdsUpdateHealthServiceImpl(health)) ServerInterceptors.intercept(
.addService(health.getHealthService()) new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(ProtoReflectionService.newInstance()) .build()
.build() .start();
.start(); maintenanceServer =
NettyServerBuilder.forPort(maintenancePort)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.build()
.start();
} else {
server =
NettyServerBuilder.forPort(port)
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.build()
.start();
maintenanceServer = null;
}
health.setStatus("", ServingStatus.SERVING); health.setStatus("", ServingStatus.SERVING);
} }
private void stop() throws Exception { private void stop() throws Exception {
server.shutdownNow(); server.shutdownNow();
if (maintenanceServer != null) {
maintenanceServer.shutdownNow();
}
if (!server.awaitTermination(5, TimeUnit.SECONDS)) { if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Timed out waiting for server shutdown"); System.err.println("Timed out waiting for server shutdown");
} }
if (maintenanceServer != null && !maintenanceServer.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Timed out waiting for maintenanceServer shutdown");
}
} }
private void blockUntilShutdown() throws InterruptedException { private void blockUntilShutdown() throws InterruptedException {
if (server != null) { if (server != null) {
server.awaitTermination(); server.awaitTermination();
} }
if (maintenanceServer != null) {
maintenanceServer.awaitTermination();
}
} }
private static class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private static class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {