From 6ceb37ae4cf5f7f10e5591d3cac901b2d673fcbd Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Thu, 12 Feb 2015 13:15:33 -0800 Subject: [PATCH] Add QPS Server and TLS support to QPS Client. --- benchmarks/build.gradle | 32 ++- .../java/io/grpc/benchmarks/qps/Main.java | 61 +++++ .../qps/{Client.java => QpsClient.java} | 76 ++++-- .../io/grpc/benchmarks/qps/QpsServer.java | 246 ++++++++++++++++++ 4 files changed, 377 insertions(+), 38 deletions(-) create mode 100644 benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java rename benchmarks/src/main/java/io/grpc/benchmarks/qps/{Client.java => QpsClient.java} (83%) create mode 100644 benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 10a843fbd8..9bfc2fc281 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -2,8 +2,8 @@ apply plugin: 'application' apply plugin: 'protobuf' description = "gRPC Benchmarks" +mainClassName = "io.grpc.benchmarks.qps.Main" -mainClassName = "io.grpc.benchmarks.qps.Client" buildscript { repositories { @@ -14,28 +14,32 @@ buildscript { } } +configurations { + alpnboot +} + dependencies { compile project(':grpc-core'), project(':grpc-netty'), project(':grpc-okhttp'), project(':grpc-stub'), - project(':grpc-testing'), + project(':grpc-integration-testing'), libraries.junit, libraries.mockito, libraries.hdrhistogram + + // Determine the correct version of Jetty ALPN boot to use based + // on the Java version. + def alpnboot_prefix = 'org.mortbay.jetty.alpn:alpn-boot:' + def alpnboot_version = '8.1.2.v20141202' + if (JavaVersion.current().ordinal() < JavaVersion.VERSION_1_8.ordinal()) { + alpnboot_version = '7.1.2.v20141202' + } + + alpnboot alpnboot_prefix + alpnboot_version + + applicationDefaultJvmArgs = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath] } protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"] generateProto.dependsOn ':grpc-compiler:java_pluginExecutable' - -// Allow execution of test client and server. -task execute(dependsOn: classes, type:JavaExec) { - main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client' - classpath = sourceSets.main.runtimeClasspath - workingDir = project.rootDir - - // If appArgs were provided, set the program arguments. - if (project.hasProperty("appArgs")) { - args = Eval.me(appArgs) - } -} diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java new file mode 100644 index 0000000000..120e0ac4a5 --- /dev/null +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Main.java @@ -0,0 +1,61 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.benchmarks.qps; + +import java.util.Arrays; + +public class Main { + public static void main(String... args) throws Exception { + if (args.length == 0) { + printUsage(); + } else { + String arg = args[0].substring(2); + if ("server".equals(arg)) { + QpsServer server = new QpsServer(); + server.run(Arrays.copyOfRange(args, 1, args.length)); + } else if ("client".equals(arg)) { + QpsClient client = new QpsClient(); + client.run(Arrays.copyOfRange(args, 1, args.length)); + } else { + printUsage(); + } + } + } + + private static void printUsage() { + System.out.println("Usage: Pass one of the two as the FIRST argument." + + "\n" + + "\n --server Run the QPS server. No default." + + "\n --client Run the QPS client. No default." + ); + } +} diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java similarity index 83% rename from benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java rename to benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java index a1a2c1bcc2..f3cced2c3a 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsClient.java @@ -35,8 +35,10 @@ import static grpc.testing.TestServiceGrpc.TestServiceStub; import static grpc.testing.Qpstest.SimpleRequest; import static grpc.testing.Qpstest.SimpleResponse; import static java.lang.Math.max; +import static io.grpc.testing.integration.Util.loadCert; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; import grpc.testing.Qpstest.PayloadType; import grpc.testing.TestServiceGrpc; @@ -47,9 +49,14 @@ import io.grpc.stub.StreamObserver; import io.grpc.transport.netty.NegotiationType; import io.grpc.transport.netty.NettyChannelBuilder; import io.grpc.transport.okhttp.OkHttpChannelBuilder; +import io.netty.handler.ssl.SslContext; import org.HdrHistogram.Histogram; import org.HdrHistogram.HistogramIterationValue; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CancellationException; @@ -61,15 +68,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; -// TODO: Add OkHttp and Netty TLS Support - /** * Runs lots of RPCs against a QPS Server to test for throughput and latency. * It's a Java clone of the C version at * https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc */ -public class Client { - private static final Logger log = Logger.getLogger(Client.class.getName()); +public class QpsClient { + private static final Logger log = Logger.getLogger(QpsClient.class.getName()); // Can record values between 1 ns and 1 min (60 BILLION NS) private static final long HISTOGRAM_MAX_VALUE = 60000000000L; @@ -84,13 +89,10 @@ public class Client { private String serverHost = "127.0.0.1"; private int serverPort; private boolean okhttp; + private boolean enableTls; + private boolean useTestCa; - public static void main(String... args) throws Exception { - Client c = new Client(); - c.run(args); - } - - private void run(String[] args) throws Exception { + public void run(String[] args) throws Exception { if (!parseArgs(args)) { return; } @@ -143,14 +145,33 @@ public class Client { } } - private Channel newChannel() { + private Channel newChannel() throws IOException { if (okhttp) { - return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build(); - } else { - return NettyChannelBuilder.forAddress(serverHost, serverPort) - .negotiationType(NegotiationType.PLAINTEXT) - .build(); + if (enableTls) { + throw new IllegalStateException("TLS unsupported with okhttp"); + } + + return OkHttpChannelBuilder.forAddress(serverHost, serverPort) + // TODO(buchgr): Figure out what "server_threads" means in java + .executor(MoreExecutors.newDirectExecutorService()) + .build(); } + + SslContext context = null; + InetAddress address = InetAddress.getByName(serverHost); + NegotiationType negotiationType = enableTls ? NegotiationType.TLS : NegotiationType.PLAINTEXT; + if (enableTls && useTestCa) { + // Force the hostname to match the cert the server uses. + address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress()); + File cert = loadCert("ca.pem"); + context = SslContext.newClientContext(cert); + } + + return NettyChannelBuilder.forAddress(new InetSocketAddress(address, serverPort)) + .executor(MoreExecutors.newDirectExecutorService()) + .negotiationType(negotiationType) + .sslContext(context) + .build(); } private boolean parseArgs(String[] args) { @@ -165,11 +186,11 @@ public class Client { } String[] pair = arg.substring(2).split("=", 2); - if (pair.length < 2) { - continue; - } String key = pair[0]; - String value = pair[1]; + String value = ""; + if (pair.length == 2) { + value = pair[1]; + } if ("client_channels".equals(key)) { clientChannels = max(Integer.parseInt(value), 1); @@ -184,8 +205,14 @@ public class Client { } else if ("server_port".equals(key)) { serverPort = Integer.parseInt(value); hasServerPort = true; - } else if ("transport".equals(key)) { - okhttp = "okhttp".equals(value); + } else if ("okhttp".equals(key)) { + okhttp = true; + } else if ("enable_tls".equals(key)) { + enableTls = true; + } else if ("use_testca".equals(key)) { + useTestCa = true; + } else { + System.err.println("Unrecognized argument '" + key + "'."); } } @@ -204,7 +231,7 @@ public class Client { } private void printUsage() { - Client c = new Client(); + QpsClient c = new QpsClient(); System.out.println( "Usage: [ARGS...]" + "\n" @@ -214,7 +241,8 @@ public class Client { + "\n --client_threads=INT Number of client threads. Default " + c.clientThreads + "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs + "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize - + "\n --transport=(okhttp|netty) The transport to use. Default netty" + + "\n --enable_tls Enable TLS. Default disabled." + + "\n --use_testca Use the provided test certificate for TLS." ); } diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java new file mode 100644 index 0000000000..ac18ffaab6 --- /dev/null +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/QpsServer.java @@ -0,0 +1,246 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.benchmarks.qps; + +import static grpc.testing.Qpstest.StatsRequest; +import static grpc.testing.Qpstest.ServerStats; +import static grpc.testing.Qpstest.Latencies; +import static grpc.testing.Qpstest.StartArgs; +import static grpc.testing.Qpstest.Payload; +import static grpc.testing.Qpstest.PayloadType; +import static grpc.testing.Qpstest.SimpleResponse; +import static grpc.testing.Qpstest.SimpleRequest; +import static grpc.testing.Qpstest.StreamingInputCallResponse; +import static grpc.testing.Qpstest.StreamingOutputCallResponse; +import static grpc.testing.Qpstest.StreamingInputCallRequest; +import static grpc.testing.Qpstest.StreamingOutputCallRequest; +import static java.lang.Math.max; +import static io.grpc.testing.integration.Util.loadCert; +import static io.grpc.testing.integration.Util.pickUnusedPort; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; + +import grpc.testing.Qpstest; +import grpc.testing.TestServiceGrpc; +import io.grpc.ServerImpl; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.grpc.transport.netty.NettyServerBuilder; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.ssl.SslContext; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +public class QpsServer { + + private boolean enable_tls; + private int port = 0; + private int serverThreads = 4; + + public void run(String[] args) throws Exception { + if (!parseArgs(args)) { + return; + } + + SslContext sslContext = null; + if (enable_tls) { + System.out.println("Using fake CA for TLS certificate.\n" + + "Run the Java client with --enable_tls --use_testca"); + + File cert = loadCert("server1.pem"); + File key = loadCert("server1.key"); + sslContext = SslContext.newServerContext(cert, key); + } + + if (port == 0) { + port = pickUnusedPort(); + } + + final EventLoopGroup boss = new NioEventLoopGroup(1); + final EventLoopGroup worker = new NioEventLoopGroup(serverThreads); + + final ServerImpl server = NettyServerBuilder + .forPort(port) + .userBossEventLoopGroup(boss) + .workerEventLoopGroup(worker) + .addService(TestServiceGrpc.bindService(new TestServiceImpl())) + .sslContext(sslContext) + // TODO(buchgr): Figure out what "server_threads" means in java + .executor(MoreExecutors.newDirectExecutorService()) + .build(); + server.start(); + + System.out.println("QPS Server started on port " + port); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + System.out.println("QPS Server shutting down"); + server.shutdown(); + server.awaitTerminated(5, TimeUnit.SECONDS); + boss.shutdownGracefully(0, 5, TimeUnit.SECONDS); + worker.shutdownGracefully(0, 5, TimeUnit.SECONDS); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + // This is a hack to remain there until we have figured out how + // we deal with daemon threads, because Netty's EventLoops by default + // use daemon threads the application would just exit immediately. + server.awaitTerminated(100, TimeUnit.DAYS); + } + + private boolean parseArgs(String[] args) { + try { + for (String arg : args) { + if (!arg.startsWith("--")) { + System.err.println("All arguments must start with '--': " + arg); + printUsage(); + return false; + } + + String[] pair = arg.substring(2).split("=", 2); + String key = pair[0]; + String value = ""; + if (pair.length == 2) { + value = pair[1]; + } + + if ("server_threads".equals(key)) { + serverThreads = max(Integer.parseInt(value), 1); + } else if ("port".equals(key)) { + port = Integer.parseInt(value); + } else if ("enable_tls".equals(key)) { + enable_tls = true; + } else { + System.err.println("Unrecognized argument '" + key + "'."); + } + } + } catch (Exception e) { + e.printStackTrace(); + printUsage(); + return false; + } + + return true; + } + + private void printUsage() { + QpsServer s = new QpsServer(); + System.out.println( + "Usage: [ARGS...]" + + "\n" + + "\n --port Port of the server. By default a random port is chosen." + + "\n --server_threads Number of client threads. Default " + s.serverThreads + + "\n --enable_tls Enable TLS. Default disabled." + ); + } + + private static class TestServiceImpl implements TestServiceGrpc.TestService { + + @Override + public void startTest(StartArgs request, StreamObserver responseObserver) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public void collectServerStats(StatsRequest request, + StreamObserver responseObserver) { + double nowSeconds = System.currentTimeMillis() / 1000.0; + + ServerStats stats = ServerStats.newBuilder() + .setTimeNow(nowSeconds) + .setTimeUser(0) + .setTimeSystem(0) + .build(); + responseObserver.onValue(stats); + responseObserver.onCompleted(); + } + + @Override + public void unaryCall(SimpleRequest request, + StreamObserver responseObserver) { + if (!request.hasResponseSize()) { + throw Status.INTERNAL.augmentDescription("responseSize required").asRuntimeException(); + } else if (!request.hasResponseType()) { + throw Status.INTERNAL.augmentDescription("responseType required").asRuntimeException(); + } else if (request.getResponseSize() > 0) { + // I just added this condition to mimic the C++ QPS Server behaviour. + if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) { + throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException(); + } + + ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]); + PayloadType type = request.getResponseType(); + + Payload payload = Payload.newBuilder().setType(type).setBody(body).build(); + SimpleResponse response = SimpleResponse.newBuilder().setPayload(payload).build(); + + responseObserver.onValue(response); + responseObserver.onCompleted(); + } else { + responseObserver.onValue(SimpleResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } + + @Override + public void streamingOutputCall(StreamingOutputCallRequest request, + StreamObserver responseObserver) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public StreamObserver + streamingInputCall(StreamObserver responseObserver) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public StreamObserver + fullDuplexCall(StreamObserver responseObserver) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + + @Override + public StreamObserver + halfDuplexCall(StreamObserver responseObserver) { + throw Status.UNIMPLEMENTED.asRuntimeException(); + } + } +}