Add QPS Server and TLS support to QPS Client.

This commit is contained in:
Jakob Buchgraber 2015-02-12 13:15:33 -08:00 committed by nmittler
parent e26608fe91
commit 6ceb37ae4c
4 changed files with 377 additions and 38 deletions

View File

@ -2,8 +2,8 @@ apply plugin: 'application'
apply plugin: 'protobuf'
description = "gRPC Benchmarks"
mainClassName = "io.grpc.benchmarks.qps.Main"
mainClassName = "io.grpc.benchmarks.qps.Client"
buildscript {
repositories {
@ -14,28 +14,32 @@ buildscript {
}
}
configurations {
alpnboot
}
dependencies {
compile project(':grpc-core'),
project(':grpc-netty'),
project(':grpc-okhttp'),
project(':grpc-stub'),
project(':grpc-testing'),
project(':grpc-integration-testing'),
libraries.junit,
libraries.mockito,
libraries.hdrhistogram
// Determine the correct version of Jetty ALPN boot to use based
// on the Java version.
def alpnboot_prefix = 'org.mortbay.jetty.alpn:alpn-boot:'
def alpnboot_version = '8.1.2.v20141202'
if (JavaVersion.current().ordinal() < JavaVersion.VERSION_1_8.ordinal()) {
alpnboot_version = '7.1.2.v20141202'
}
alpnboot alpnboot_prefix + alpnboot_version
applicationDefaultJvmArgs = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
}
protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"]
generateProto.dependsOn ':grpc-compiler:java_pluginExecutable'
// Allow execution of test client and server.
task execute(dependsOn: classes, type:JavaExec) {
main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client'
classpath = sourceSets.main.runtimeClasspath
workingDir = project.rootDir
// If appArgs were provided, set the program arguments.
if (project.hasProperty("appArgs")) {
args = Eval.me(appArgs)
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import java.util.Arrays;
public class Main {
public static void main(String... args) throws Exception {
if (args.length == 0) {
printUsage();
} else {
String arg = args[0].substring(2);
if ("server".equals(arg)) {
QpsServer server = new QpsServer();
server.run(Arrays.copyOfRange(args, 1, args.length));
} else if ("client".equals(arg)) {
QpsClient client = new QpsClient();
client.run(Arrays.copyOfRange(args, 1, args.length));
} else {
printUsage();
}
}
}
private static void printUsage() {
System.out.println("Usage: Pass one of the two as the FIRST argument."
+ "\n"
+ "\n --server Run the QPS server. No default."
+ "\n --client Run the QPS client. No default."
);
}
}

View File

@ -35,8 +35,10 @@ import static grpc.testing.TestServiceGrpc.TestServiceStub;
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.SimpleResponse;
import static java.lang.Math.max;
import static io.grpc.testing.integration.Util.loadCert;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import grpc.testing.Qpstest.PayloadType;
import grpc.testing.TestServiceGrpc;
@ -47,9 +49,14 @@ import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import io.netty.handler.ssl.SslContext;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
@ -61,15 +68,13 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
// TODO: Add OkHttp and Netty TLS Support
/**
* Runs lots of RPCs against a QPS Server to test for throughput and latency.
* It's a Java clone of the C version at
* https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc
*/
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getName());
public class QpsClient {
private static final Logger log = Logger.getLogger(QpsClient.class.getName());
// Can record values between 1 ns and 1 min (60 BILLION NS)
private static final long HISTOGRAM_MAX_VALUE = 60000000000L;
@ -84,13 +89,10 @@ public class Client {
private String serverHost = "127.0.0.1";
private int serverPort;
private boolean okhttp;
private boolean enableTls;
private boolean useTestCa;
public static void main(String... args) throws Exception {
Client c = new Client();
c.run(args);
}
private void run(String[] args) throws Exception {
public void run(String[] args) throws Exception {
if (!parseArgs(args)) {
return;
}
@ -143,14 +145,33 @@ public class Client {
}
}
private Channel newChannel() {
private Channel newChannel() throws IOException {
if (okhttp) {
return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build();
} else {
return NettyChannelBuilder.forAddress(serverHost, serverPort)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
if (enableTls) {
throw new IllegalStateException("TLS unsupported with okhttp");
}
return OkHttpChannelBuilder.forAddress(serverHost, serverPort)
// TODO(buchgr): Figure out what "server_threads" means in java
.executor(MoreExecutors.newDirectExecutorService())
.build();
}
SslContext context = null;
InetAddress address = InetAddress.getByName(serverHost);
NegotiationType negotiationType = enableTls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
if (enableTls && useTestCa) {
// Force the hostname to match the cert the server uses.
address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
File cert = loadCert("ca.pem");
context = SslContext.newClientContext(cert);
}
return NettyChannelBuilder.forAddress(new InetSocketAddress(address, serverPort))
.executor(MoreExecutors.newDirectExecutorService())
.negotiationType(negotiationType)
.sslContext(context)
.build();
}
private boolean parseArgs(String[] args) {
@ -165,11 +186,11 @@ public class Client {
}
String[] pair = arg.substring(2).split("=", 2);
if (pair.length < 2) {
continue;
}
String key = pair[0];
String value = pair[1];
String value = "";
if (pair.length == 2) {
value = pair[1];
}
if ("client_channels".equals(key)) {
clientChannels = max(Integer.parseInt(value), 1);
@ -184,8 +205,14 @@ public class Client {
} else if ("server_port".equals(key)) {
serverPort = Integer.parseInt(value);
hasServerPort = true;
} else if ("transport".equals(key)) {
okhttp = "okhttp".equals(value);
} else if ("okhttp".equals(key)) {
okhttp = true;
} else if ("enable_tls".equals(key)) {
enableTls = true;
} else if ("use_testca".equals(key)) {
useTestCa = true;
} else {
System.err.println("Unrecognized argument '" + key + "'.");
}
}
@ -204,7 +231,7 @@ public class Client {
}
private void printUsage() {
Client c = new Client();
QpsClient c = new QpsClient();
System.out.println(
"Usage: [ARGS...]"
+ "\n"
@ -214,7 +241,8 @@ public class Client {
+ "\n --client_threads=INT Number of client threads. Default " + c.clientThreads
+ "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs
+ "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize
+ "\n --transport=(okhttp|netty) The transport to use. Default netty"
+ "\n --enable_tls Enable TLS. Default disabled."
+ "\n --use_testca Use the provided test certificate for TLS."
);
}

View File

@ -0,0 +1,246 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static grpc.testing.Qpstest.StatsRequest;
import static grpc.testing.Qpstest.ServerStats;
import static grpc.testing.Qpstest.Latencies;
import static grpc.testing.Qpstest.StartArgs;
import static grpc.testing.Qpstest.Payload;
import static grpc.testing.Qpstest.PayloadType;
import static grpc.testing.Qpstest.SimpleResponse;
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.StreamingInputCallResponse;
import static grpc.testing.Qpstest.StreamingOutputCallResponse;
import static grpc.testing.Qpstest.StreamingInputCallRequest;
import static grpc.testing.Qpstest.StreamingOutputCallRequest;
import static java.lang.Math.max;
import static io.grpc.testing.integration.Util.loadCert;
import static io.grpc.testing.integration.Util.pickUnusedPort;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import grpc.testing.Qpstest;
import grpc.testing.TestServiceGrpc;
import io.grpc.ServerImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.util.concurrent.TimeUnit;
public class QpsServer {
private boolean enable_tls;
private int port = 0;
private int serverThreads = 4;
public void run(String[] args) throws Exception {
if (!parseArgs(args)) {
return;
}
SslContext sslContext = null;
if (enable_tls) {
System.out.println("Using fake CA for TLS certificate.\n"
+ "Run the Java client with --enable_tls --use_testca");
File cert = loadCert("server1.pem");
File key = loadCert("server1.key");
sslContext = SslContext.newServerContext(cert, key);
}
if (port == 0) {
port = pickUnusedPort();
}
final EventLoopGroup boss = new NioEventLoopGroup(1);
final EventLoopGroup worker = new NioEventLoopGroup(serverThreads);
final ServerImpl server = NettyServerBuilder
.forPort(port)
.userBossEventLoopGroup(boss)
.workerEventLoopGroup(worker)
.addService(TestServiceGrpc.bindService(new TestServiceImpl()))
.sslContext(sslContext)
// TODO(buchgr): Figure out what "server_threads" means in java
.executor(MoreExecutors.newDirectExecutorService())
.build();
server.start();
System.out.println("QPS Server started on port " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
System.out.println("QPS Server shutting down");
server.shutdown();
server.awaitTerminated(5, TimeUnit.SECONDS);
boss.shutdownGracefully(0, 5, TimeUnit.SECONDS);
worker.shutdownGracefully(0, 5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// This is a hack to remain there until we have figured out how
// we deal with daemon threads, because Netty's EventLoops by default
// use daemon threads the application would just exit immediately.
server.awaitTerminated(100, TimeUnit.DAYS);
}
private boolean parseArgs(String[] args) {
try {
for (String arg : args) {
if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg);
printUsage();
return false;
}
String[] pair = arg.substring(2).split("=", 2);
String key = pair[0];
String value = "";
if (pair.length == 2) {
value = pair[1];
}
if ("server_threads".equals(key)) {
serverThreads = max(Integer.parseInt(value), 1);
} else if ("port".equals(key)) {
port = Integer.parseInt(value);
} else if ("enable_tls".equals(key)) {
enable_tls = true;
} else {
System.err.println("Unrecognized argument '" + key + "'.");
}
}
} catch (Exception e) {
e.printStackTrace();
printUsage();
return false;
}
return true;
}
private void printUsage() {
QpsServer s = new QpsServer();
System.out.println(
"Usage: [ARGS...]"
+ "\n"
+ "\n --port Port of the server. By default a random port is chosen."
+ "\n --server_threads Number of client threads. Default " + s.serverThreads
+ "\n --enable_tls Enable TLS. Default disabled."
);
}
private static class TestServiceImpl implements TestServiceGrpc.TestService {
@Override
public void startTest(StartArgs request, StreamObserver<Latencies> responseObserver) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}
@Override
public void collectServerStats(StatsRequest request,
StreamObserver<ServerStats> responseObserver) {
double nowSeconds = System.currentTimeMillis() / 1000.0;
ServerStats stats = ServerStats.newBuilder()
.setTimeNow(nowSeconds)
.setTimeUser(0)
.setTimeSystem(0)
.build();
responseObserver.onValue(stats);
responseObserver.onCompleted();
}
@Override
public void unaryCall(SimpleRequest request,
StreamObserver<Qpstest.SimpleResponse> responseObserver) {
if (!request.hasResponseSize()) {
throw Status.INTERNAL.augmentDescription("responseSize required").asRuntimeException();
} else if (!request.hasResponseType()) {
throw Status.INTERNAL.augmentDescription("responseType required").asRuntimeException();
} else if (request.getResponseSize() > 0) {
// I just added this condition to mimic the C++ QPS Server behaviour.
if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
}
ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
PayloadType type = request.getResponseType();
Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
SimpleResponse response = SimpleResponse.newBuilder().setPayload(payload).build();
responseObserver.onValue(response);
responseObserver.onCompleted();
} else {
responseObserver.onValue(SimpleResponse.getDefaultInstance());
responseObserver.onCompleted();
}
}
@Override
public void streamingOutputCall(StreamingOutputCallRequest request,
StreamObserver<StreamingOutputCallResponse> responseObserver) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}
@Override
public StreamObserver<StreamingInputCallRequest>
streamingInputCall(StreamObserver<StreamingInputCallResponse> responseObserver) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}
@Override
public StreamObserver<StreamingOutputCallRequest>
fullDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}
@Override
public StreamObserver<StreamingOutputCallRequest>
halfDuplexCall(StreamObserver<StreamingOutputCallResponse> responseObserver) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}
}
}