Adding support for UDS to benchmarks

This commit is contained in:
nmittler 2015-05-18 10:04:27 -07:00
parent eee86b4fbb
commit 43038a514e
11 changed files with 1292 additions and 655 deletions

View File

@ -0,0 +1,258 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static java.lang.Math.max;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
/**
* Abstract base class for all {@link Configuration.Builder}s.
*/
public abstract class AbstractConfigurationBuilder<T extends Configuration>
implements Configuration.Builder<T> {
private static final Param HELP = new Param() {
@Override
public String getName() {
return "help";
}
@Override
public String getType() {
return "";
}
@Override
public String getDescription() {
return "Print this text.";
}
@Override
public boolean isRequired() {
return false;
}
public String getDefaultValue() {
return null;
}
@Override
public void setValue(Configuration config, String value) {
throw new UnsupportedOperationException();
}
};
/**
* A single application parameter supported by this builder.
*/
protected interface Param {
/**
* The name of the parameter as it would appear on the command-line.
*/
String getName();
/**
* A string representation of the parameter type. If not applicable, just returns an empty
* string.
*/
String getType();
/**
* A description of this parameter used when printing usage.
*/
String getDescription();
/**
* The default value used when not set explicitly. Ignored if {@link #isRequired()} is {@code
* true}.
*/
String getDefaultValue();
/**
* Indicates whether or not this parameter is required and must therefore be set before the
* configuration can be successfully built.
*/
boolean isRequired();
/**
* Sets this parameter on the given configuration instance.
*/
void setValue(Configuration config, String value);
}
@Override
public final T build(String[] args) {
T config = newConfiguration();
Map<String, Param> paramMap = getParamMap();
Set<String> appliedParams = new TreeSet<String>(CASE_INSENSITIVE_ORDER);
for (String arg : args) {
if (!arg.startsWith("--")) {
throw new IllegalArgumentException("All arguments must start with '--': " + arg);
}
String[] pair = arg.substring(2).split("=", 2);
String key = pair[0];
String value = "";
if (pair.length == 2) {
value = pair[1];
}
// If help was requested, just throw now to print out the usage.
if (HELP.getName().equalsIgnoreCase(key)) {
throw new IllegalArgumentException("Help requested");
}
Param param = paramMap.get(key);
if (param == null) {
throw new IllegalArgumentException("Unsupported argument: " + key);
}
param.setValue(config, value);
appliedParams.add(key);
}
// Ensure that all required options have been provided.
for (Param param : getParams()) {
if (param.isRequired() && !appliedParams.contains(param.getName())) {
throw new IllegalArgumentException("Missing required option '--"
+ param.getName() + "'.");
}
}
return build0(config);
}
@Override
public final void printUsage() {
System.out.println("Usage: [ARGS...]");
int column1Width = 0;
List<Param> params = new ArrayList<Param>();
params.add(HELP);
params.addAll(getParams());
for (Param param : params) {
column1Width = max(commandLineFlag(param).length(), column1Width);
}
int column1Start = 2;
int column2Start = column1Start + column1Width + 2;
for (Param param : params) {
StringBuilder sb = new StringBuilder();
sb.append(Strings.repeat(" ", column1Start));
sb.append(commandLineFlag(param));
sb.append(Strings.repeat(" ", column2Start - sb.length()));
String message = param.getDescription();
sb.append(wordWrap(message, column2Start, 80));
if (param.isRequired()) {
sb.append(Strings.repeat(" ", column2Start));
sb.append("[Required]\n");
} else if (param.getDefaultValue() != null && !param.getDefaultValue().isEmpty()) {
sb.append(Strings.repeat(" ", column2Start));
sb.append("[Default=" + param.getDefaultValue() + "]\n");
}
System.out.println(sb);
}
System.out.println();
}
/**
* Creates a new configuration instance which will be used as the target for command-line
* arguments.
*/
protected abstract T newConfiguration();
/**
* Returns the valid parameters supported by the configuration.
*/
protected abstract Collection<Param> getParams();
/**
* Called by {@link #build(String[])} after verifying that all required options have been set.
* Performs any final validation and modifications to the configuration. If successful, returns
* the fully built configuration.
*/
protected abstract T build0(T config);
private Map<String, Param> getParamMap() {
Map<String, Param> map = new TreeMap<String, Param>(CASE_INSENSITIVE_ORDER);
for (Param param : getParams()) {
map.put(param.getName(), param);
}
return map;
}
private static String commandLineFlag(Param param) {
String name = param.getName().toLowerCase();
String type = (!param.getType().isEmpty() ? '=' + param.getType() : "");
return "--" + name + type;
}
private static String wordWrap(String text, int startPos, int maxPos) {
StringBuilder builder = new StringBuilder();
int pos = startPos;
String[] parts = text.split("\\n");
boolean isBulleted = parts.length > 1;
for (String part : parts) {
int lineStart = startPos;
while (!part.isEmpty()) {
if (pos < lineStart) {
builder.append(Strings.repeat(" ", lineStart - pos));
pos = lineStart;
}
int maxLength = maxPos - pos;
int length = part.length();
if (length > maxLength) {
length = part.lastIndexOf(' ', maxPos - pos) + 1;
if (length == 0) {
length = part.length();
}
}
builder.append(part.substring(0, length));
part = part.substring(length);
// Wrap to the next line.
builder.append("\n");
pos = 0;
lineStart = isBulleted ? startPos + 2 : startPos;
}
}
return builder.toString();
}
}

View File

@ -31,59 +31,42 @@
package io.grpc.benchmarks.qps;
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.SimpleResponse;
import static grpc.testing.TestServiceGrpc.TestServiceStub;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.CHANNELS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.DIRECTEXECUTOR;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.HOST;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.NETTY_NATIVE_TRANSPORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.OKHTTP;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.OUTSTANDING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.PORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.STREAMING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.WARMUP_DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.HISTOGRAM_MAX_VALUE;
import static io.grpc.benchmarks.qps.ClientConfiguration.HISTOGRAM_PRECISION;
import static io.grpc.benchmarks.qps.ClientUtil.saveHistogram;
import static io.grpc.testing.integration.Util.loadCert;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DIRECTEXECUTOR;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.OUTSTANDING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAMING_RPCS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION;
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_MAX_VALUE;
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_PRECISION;
import static io.grpc.benchmarks.qps.Utils.newClientChannel;
import static io.grpc.benchmarks.qps.Utils.saveHistogram;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import grpc.testing.Qpstest.Payload;
import grpc.testing.TestServiceGrpc;
import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.GrpcSslContexts;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import io.grpc.testing.Payload;
import io.grpc.testing.SimpleRequest;
import io.grpc.testing.SimpleResponse;
import io.grpc.testing.TestServiceGrpc;
import io.grpc.testing.TestServiceGrpc.TestServiceStub;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
@ -115,7 +98,7 @@ public class AsyncClient {
List<Channel> channels = new ArrayList<Channel>(config.channels);
for (int i = 0; i < config.channels; i++) {
channels.add(newChannel());
channels.add(newClientChannel(config));
}
// Do a warmup first. It's the same as the actual benchmark, except that
@ -175,58 +158,6 @@ public class AsyncClient {
return histograms;
}
private Channel newChannel() throws IOException {
if (config.okhttp) {
if (config.tls) {
throw new IllegalStateException("TLS unsupported with okhttp");
}
return OkHttpChannelBuilder
.forAddress(config.host, config.port)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.build();
}
SslContext context = null;
InetAddress address = InetAddress.getByName(config.host);
NegotiationType negotiationType = config.tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
if (config.tls && config.testca) {
// Force the hostname to match the cert the server uses.
address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
File cert = loadCert("ca.pem");
context = GrpcSslContexts.forClient().trustManager(cert)
.sslProvider(config.nettyNativeTransport ? SslProvider.OPENSSL : SslProvider.JDK)
.build();
}
final EventLoopGroup group;
final Class<? extends io.netty.channel.Channel> channelType;
if (config.nettyNativeTransport) {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends io.netty.channel.Channel> channelClass =
(Class<? extends io.netty.channel.Channel>) Class.forName(
"io.netty.channel.epoll.EpollSocketChannel");
group = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
group = new NioEventLoopGroup();
channelType = NioSocketChannel.class;
}
return NettyChannelBuilder
.forAddress(new InetSocketAddress(address, config.port))
.eventLoopGroup(group)
.channelType(channelType)
.negotiationType(negotiationType)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.sslContext(context)
.connectionWindowSize(config.connectionWindow)
.streamWindowSize(config.streamWindow)
.build();
}
private Future<Histogram> doRpcs(Channel channel, SimpleRequest request, long endTime) {
switch (config.rpcType) {
case UNARY:
@ -393,13 +324,10 @@ public class AsyncClient {
* checkstyle complains if there is no javadoc comment here.
*/
public static void main(String... args) throws Exception {
ClientConfiguration.Builder configBuilder =
ClientConfiguration.newBuilder()
.addOptions(PORT, HOST, CHANNELS, OUTSTANDING_RPCS)
.addOptions(CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, TESTCA)
.addOptions(OKHTTP, DURATION, WARMUP_DURATION, DIRECTEXECUTOR)
.addOptions(SAVE_HISTOGRAM, STREAMING_RPCS, CONNECTION_WINDOW)
.addOptions(STREAM_WINDOW, NETTY_NATIVE_TRANSPORT);
ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
ADDRESS, CHANNELS, OUTSTANDING_RPCS, CLIENT_PAYLOAD, SERVER_PAYLOAD,
TLS, TESTCA, TRANSPORT, DURATION, WARMUP_DURATION, DIRECTEXECUTOR,
SAVE_HISTOGRAM, STREAMING_RPCS, CONNECTION_WINDOW, STREAM_WINDOW);
ClientConfiguration config;
try {
config = configBuilder.build(args);

View File

@ -31,21 +31,19 @@
package io.grpc.benchmarks.qps;
import static grpc.testing.Qpstest.Payload;
import static grpc.testing.Qpstest.PayloadType;
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.SimpleResponse;
import static io.grpc.testing.integration.Util.loadCert;
import static io.grpc.testing.integration.Util.pickUnusedPort;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import grpc.testing.TestServiceGrpc;
import io.grpc.ServerImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.Payload;
import io.grpc.testing.PayloadType;
import io.grpc.testing.SimpleRequest;
import io.grpc.testing.SimpleResponse;
import io.grpc.testing.TestServiceGrpc;
import io.grpc.transport.netty.GrpcSslContexts;
import io.grpc.transport.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
@ -56,6 +54,7 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
@ -63,13 +62,6 @@ import java.util.concurrent.TimeUnit;
*/
public class AsyncServer {
private boolean tls;
private int port;
private int connectionWindow = NettyServerBuilder.DEFAULT_CONNECTION_WINDOW_SIZE;
private int streamWindow = NettyServerBuilder.DEFAULT_STREAM_WINDOW_SIZE;
private boolean directExecutor;
private boolean nettyNativeTransport;
/**
* checkstyle complains if there is no javadoc comment here.
*/
@ -79,60 +71,20 @@ public class AsyncServer {
/** Equivalent of "main", but non-static. */
public void run(String[] args) throws Exception {
if (!parseArgs(args)) {
ServerConfiguration.Builder configBuilder = ServerConfiguration.newBuilder();
ServerConfiguration config;
try {
config = configBuilder.build(args);
} catch (Exception e) {
System.out.println(e.getMessage());
configBuilder.printUsage();
return;
}
SslContext sslContext = null;
if (tls) {
System.out.println("Using fake CA for TLS certificate.\n"
+ "Run the Java client with --tls --testca");
File cert = loadCert("server1.pem");
File key = loadCert("server1.key");
sslContext = GrpcSslContexts.forServer(cert, key)
.sslProvider(nettyNativeTransport ? SslProvider.OPENSSL : SslProvider.JDK).build();
}
if (port == 0) {
port = pickUnusedPort();
}
final EventLoopGroup boss;
final EventLoopGroup worker;
final Class<? extends ServerChannel> channelType;
if (nettyNativeTransport) {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
Class.forName("io.netty.channel.epoll.EpollServerSocketChannel");
boss = (EventLoopGroup) groupClass.newInstance();
worker = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
channelType = NioServerSocketChannel.class;
}
final ServerImpl server = NettyServerBuilder
.forPort(port)
.bossEventLoopGroup(boss)
.workerEventLoopGroup(worker)
.channelType(channelType)
.addService(TestServiceGrpc.bindService(new TestServiceImpl()))
.sslContext(sslContext)
.executor(directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.connectionWindowSize(connectionWindow)
.streamWindowSize(streamWindow)
.build();
final ServerImpl server = newServer(config);
server.start();
System.out.println("QPS Server started on port " + port);
System.out.println("QPS Server started on " + config.address);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
@ -148,66 +100,77 @@ public class AsyncServer {
});
}
private boolean parseArgs(String[] args) {
try {
for (String arg : args) {
if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg);
printUsage();
return false;
}
static ServerImpl newServer(ServerConfiguration config) throws IOException {
SslContext sslContext = null;
if (config.tls) {
System.out.println("Using fake CA for TLS certificate.\n"
+ "Run the Java client with --tls --testca");
String[] pair = arg.substring(2).split("=", 2);
String key = pair[0];
String value = "";
if (pair.length == 2) {
value = pair[1];
}
if ("help".equals(key)) {
printUsage();
return false;
} else if ("port".equals(key)) {
port = Integer.parseInt(value);
} else if ("tls".equals(key)) {
tls = true;
} else if ("directexecutor".equals(key)) {
directExecutor = true;
} else if ("connection_window".equals(key)) {
connectionWindow = Integer.parseInt(value);
} else if ("stream_window".equals(key)) {
streamWindow = Integer.parseInt(value);
} else if ("netty_native_transport".equals(key)) {
nettyNativeTransport = true;
} else {
System.err.println("Unrecognized argument '" + key + "'.");
}
}
} catch (Exception e) {
e.printStackTrace();
printUsage();
return false;
File cert = loadCert("server1.pem");
File key = loadCert("server1.key");
boolean useJdkSsl = config.transport == ServerConfiguration.Transport.NETTY_NIO;
sslContext = GrpcSslContexts.forServer(cert, key)
.sslProvider(useJdkSsl ? SslProvider.JDK : SslProvider.OPENSSL)
.build();
}
return true;
}
final EventLoopGroup boss;
final EventLoopGroup worker;
final Class<? extends ServerChannel> channelType;
switch (config.transport) {
case NETTY_NIO: {
boss = new NioEventLoopGroup();
worker = new NioEventLoopGroup();
channelType = NioServerSocketChannel.class;
break;
}
case NETTY_EPOLL: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
Class.forName("io.netty.channel.epoll.EpollServerSocketChannel");
boss = (EventLoopGroup) groupClass.newInstance();
worker = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
case NETTY_UNIX_DOMAIN_SOCKET: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends ServerChannel> channelClass = (Class<? extends ServerChannel>)
Class.forName("io.netty.channel.epoll.EpollServerDomainSocketChannel");
boss = (EventLoopGroup) groupClass.newInstance();
worker = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
default: {
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + config.transport);
}
}
private void printUsage() {
System.out.println(
"Usage: [ARGS...]"
+ "\n"
+ "\n --port=INT Port of the server. Required. No default."
+ "\n --tls Enable TLS. Default disabled."
+ "\n --directexecutor Use a direct executor i.e. execute all RPC"
+ "\n calls directly in Netty's event loop"
+ "\n overhead of a thread pool."
+ "\n --netty_native_transport Whether to use Netty's native transport."
+ "\n Only supported on linux."
+ "\n --connection_window=BYTES The HTTP/2 connection flow control window."
+ "\n Default " + connectionWindow + " byte."
+ "\n --stream_window=BYTES The HTTP/2 per-stream flow control window."
+ "\n Default " + streamWindow + " byte."
);
return NettyServerBuilder
.forAddress(config.address)
.bossEventLoopGroup(boss)
.workerEventLoopGroup(worker)
.channelType(channelType)
.addService(TestServiceGrpc.bindService(new TestServiceImpl()))
.sslContext(sslContext)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.connectionWindowSize(config.connectionWindow)
.streamWindowSize(config.streamWindow)
.build();
}
private static class TestServiceImpl implements TestServiceGrpc.TestService {
@ -243,12 +206,6 @@ public class AsyncServer {
}
private static SimpleResponse buildSimpleResponse(SimpleRequest request) {
if (!request.hasResponseSize()) {
throw Status.INTERNAL.augmentDescription("responseSize required").asRuntimeException();
}
if (!request.hasResponseType()) {
throw Status.INTERNAL.augmentDescription("responseType required").asRuntimeException();
}
if (request.getResponseSize() > 0) {
if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();

View File

@ -31,51 +31,50 @@
package io.grpc.benchmarks.qps;
import static grpc.testing.Qpstest.RpcType.STREAMING;
import static grpc.testing.Qpstest.RpcType.UNARY;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.HELP;
import static io.grpc.benchmarks.qps.SocketAddressValidator.INET;
import static io.grpc.benchmarks.qps.SocketAddressValidator.UDS;
import static io.grpc.benchmarks.qps.Utils.parseBoolean;
import static io.grpc.testing.RpcType.STREAMING;
import static io.grpc.testing.RpcType.UNARY;
import static java.lang.Integer.parseInt;
import static java.lang.Math.max;
import static java.util.Arrays.asList;
import com.google.common.base.Strings;
import grpc.testing.Qpstest.PayloadType;
import grpc.testing.Qpstest.RpcType;
import io.grpc.testing.PayloadType;
import io.grpc.testing.RpcType;
import io.grpc.transport.netty.NettyChannelBuilder;
import java.util.HashSet;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* Configuration options for the client implementations.
* Configuration options for benchmark clients.
*/
class ClientConfiguration {
class ClientConfiguration implements Configuration {
private static final String TESTCA_HOST = "foo.test.google.fr";
private static final ClientConfiguration DEFAULT = new ClientConfiguration();
// The histogram can record values between 1 microsecond and 1 min.
static final long HISTOGRAM_MAX_VALUE = 60000000L;
// Value quantization will be no larger than 1/10^3 = 0.1%.
static final int HISTOGRAM_PRECISION = 3;
boolean okhttp;
Transport transport = Transport.NETTY_NIO;
boolean tls;
boolean testca;
boolean directExecutor;
boolean nettyNativeTransport;
int port;
SocketAddress address;
int channels = 4;
int outstandingRpcsPerChannel = 10;
int serverPayload;
int clientPayload;
int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE;
int streamWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE;
int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE;
// seconds
int duration = 60;
// seconds
int warmupDuration = 10;
int targetQps;
String host;
String histogramFile;
RpcType rpcType = UNARY;
PayloadType payloadType = PayloadType.COMPRESSABLE;
@ -83,240 +82,269 @@ class ClientConfiguration {
private ClientConfiguration() {
}
static Builder newBuilder() {
return new Builder();
/**
* Constructs a builder for configuring a client application with supported parameters. If no
* parameters are provided, all parameters are assumed to be supported.
*/
static Builder newBuilder(ClientParam... supportedParams) {
return new Builder(supportedParams);
}
static class Builder {
static class Builder extends AbstractConfigurationBuilder<ClientConfiguration> {
private final Collection<Param> supportedParams;
private final Set<Option> options;
private Builder() {
options = new LinkedHashSet<Option>();
options.add(HELP);
private Builder(ClientParam... supportedParams) {
this.supportedParams = supportedOptionsSet(supportedParams);
}
Builder addOptions(Option... opts) {
options.addAll(asList(opts));
return this;
@Override
protected ClientConfiguration newConfiguration() {
return new ClientConfiguration();
}
void printUsage() {
System.out.println("Usage: [ARGS...]");
int maxWidth = 0;
for (Option option : options) {
maxWidth = max(commandLineFlag(option).length(), maxWidth);
}
// padding
maxWidth += 2;
for (Option option : options) {
StringBuilder sb = new StringBuilder();
sb.append(commandLineFlag(option))
.append(Strings.repeat(" ", maxWidth - sb.length()))
.append(option.description)
.append(option.required ? " Required." : "");
System.out.println(" " + sb);
}
System.out.println();
@Override
protected Collection<Param> getParams() {
return supportedParams;
}
ClientConfiguration build(String[] args) {
ClientConfiguration config = new ClientConfiguration();
Set<Option> appliedOptions = new HashSet<Option>();
for (String arg : args) {
if (!arg.startsWith("--")) {
throw new IllegalArgumentException("All arguments must start with '--': " + arg);
@Override
protected ClientConfiguration build0(ClientConfiguration config) {
if (config.tls) {
if (!config.transport.tlsSupported) {
throw new IllegalArgumentException(
"Transport " + config.transport.name().toLowerCase() + " does not support TLS.");
}
String[] pair = arg.substring(2).split("=", 2);
String key = pair[0];
String value = "";
if (pair.length == 2) {
value = pair[1];
}
for (Option option : options) {
if (key.equals(option.toString())) {
if (option != HELP) {
option.action.applyNew(config, value);
appliedOptions.add(option);
} else {
throw new RuntimeException("");
}
if (config.testca && config.address instanceof InetSocketAddress) {
// Override the socket address with the host from the testca.
try {
InetSocketAddress prevAddress = (InetSocketAddress) config.address;
InetAddress inetAddress = InetAddress.getByName(prevAddress.getHostName());
inetAddress = InetAddress.getByAddress(TESTCA_HOST, inetAddress.getAddress());
config.address = new InetSocketAddress(inetAddress, prevAddress.getPort());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
}
for (Option option : options) {
if (option.required && !appliedOptions.contains(option)) {
throw new IllegalArgumentException("Missing required option '--" + option + "'.");
}
}
// Verify that the address type is correct for the transport type.
config.transport.validateSocketAddress(config.address);
return config;
}
private static String commandLineFlag(Option option) {
return "--" + option + (option.type != "" ? '=' + option.type : "");
}
private interface Action {
void applyNew(ClientConfiguration config, String value);
}
enum Option {
HELP("", "Print this text.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
}
}),
PORT("INT", "Port of the Server.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.port = parseInt(value);
}
}, true),
HOST("STR", "Hostname or IP Address of the Server.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.host = value;
}
}, true),
CHANNELS("INT", "Number of Channels.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.channels = parseInt(value);
}
}),
OUTSTANDING_RPCS("INT", "Number of outstanding RPCs per Channel.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.outstandingRpcsPerChannel = parseInt(value);
}
}),
CLIENT_PAYLOAD("BYTES", "Payload Size of the Request.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.clientPayload = parseInt(value);
}
}),
SERVER_PAYLOAD("BYTES", "Payload Size of the Response.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.serverPayload = parseInt(value);
}
}),
TLS("", "Enable TLS.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.tls = true;
if (!value.isEmpty()) {
config.tls = Boolean.parseBoolean(value);
}
}
}),
TESTCA("", "Use the provided Test Certificate for TLS.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.testca = true;
if (!value.isEmpty()) {
config.testca = Boolean.parseBoolean(value);
}
}
}),
OKHTTP("", "Use OkHttp as the Transport.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.okhttp = true;
if (!value.isEmpty()) {
config.okhttp = Boolean.parseBoolean(value);
}
}
}),
DURATION("SECONDS", "Duration of the benchmark.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.duration = parseInt(value);
}
}),
WARMUP_DURATION("SECONDS", "Warmup Duration of the benchmark.",
new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.warmupDuration = parseInt(value);
}
}),
DIRECTEXECUTOR("", "Don't use a threadpool for RPC calls, instead execute calls directly "
+ "in the transport thread.",
new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.directExecutor = true;
if (!value.isEmpty()) {
config.directExecutor = Boolean.parseBoolean(value);
}
}
}),
NETTY_NATIVE_TRANSPORT("", "Whether to use Netty's native transport. Only supported when "
+ "using the Netty transport on Linux.",
new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.nettyNativeTransport = true;
if (!value.isEmpty()) {
config.nettyNativeTransport = Boolean.parseBoolean(value);
}
}
}),
SAVE_HISTOGRAM("FILE", "Write the histogram with the latency recordings to file.",
new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.histogramFile = value;
}
}),
STREAMING_RPCS("", "Use Streaming RPCs.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.rpcType = STREAMING;
}
}),
CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.connectionWindow = parseInt(value);
}
}),
STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.streamWindow = parseInt(value);
}
}),
TARGET_QPS("INT", "Average number of QPS to shoot for.", new Action() {
@Override
public void applyNew(ClientConfiguration config, String value) {
config.targetQps = parseInt(value);
}
}, true);
private final String type;
private final String description;
private final Action action;
private final boolean required;
Option(String type, String description, Action action) {
this(type, description, action, false);
}
Option(String type, String description, Action action, boolean required) {
this.type = type;
this.description = description;
this.action = action;
this.required = required;
}
@Override
public String toString() {
return name().toLowerCase();
private static Set<Param> supportedOptionsSet(ClientParam... supportedParams) {
if (supportedParams.length == 0) {
// If no options are supplied, default to including all options.
supportedParams = ClientParam.values();
}
return Collections.unmodifiableSet(new LinkedHashSet<Param>(asList(supportedParams)));
}
}
/**
* All of the supported transports.
*/
enum Transport {
NETTY_NIO(true, "The Netty Java NIO transport. Using this with TLS requires "
+ "that the Java bootclasspath be configured with Jetty ALPN boot.", INET),
NETTY_EPOLL(true, "The Netty native EPOLL transport. Using this with TLS requires that "
+ "OpenSSL be installed and configured as described in "
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.", INET),
NETTY_UNIX_DOMAIN_SOCKET(false, "The Netty Unix Domain Socket transport. This currently "
+ "does not support TLS.", UDS),
OK_HTTP(false, "The OkHttp transport. This currently does not support TLS.", INET);
final boolean tlsSupported;
final String description;
final SocketAddressValidator socketAddressValidator;
Transport(boolean tlsSupported, String description,
SocketAddressValidator socketAddressValidator) {
this.tlsSupported = tlsSupported;
this.description = description;
this.socketAddressValidator = socketAddressValidator;
}
/**
* Validates the given address for this transport.
*
* @throws IllegalArgumentException if the given address is invalid for this transport.
*/
void validateSocketAddress(SocketAddress address) {
if (!socketAddressValidator.isValidSocketAddress(address)) {
throw new IllegalArgumentException(
"Invalid address " + address + " for transport " + this);
}
}
static String getDescriptionString() {
StringBuilder builder = new StringBuilder("Select the transport to use. Options:\n");
boolean first = true;
for (Transport transport : Transport.values()) {
if (!first) {
builder.append("\n");
}
builder.append(transport.name().toLowerCase());
builder.append(": ");
builder.append(transport.description);
first = false;
}
return builder.toString();
}
}
enum ClientParam implements AbstractConfigurationBuilder.Param {
ADDRESS("STR", "Socket address (host:port) or Unix Domain Socket file name "
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.address = Utils.parseSocketAddress(value);
}
},
CHANNELS("INT", "Number of Channels.", "" + DEFAULT.channels) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.channels = parseInt(value);
}
},
OUTSTANDING_RPCS("INT", "Number of outstanding RPCs per Channel.",
"" + DEFAULT.outstandingRpcsPerChannel) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.outstandingRpcsPerChannel = parseInt(value);
}
},
CLIENT_PAYLOAD("BYTES", "Payload Size of the Request.", "" + DEFAULT.clientPayload) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.clientPayload = parseInt(value);
}
},
SERVER_PAYLOAD("BYTES", "Payload Size of the Response.", "" + DEFAULT.serverPayload) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.serverPayload = parseInt(value);
}
},
TLS("", "Enable TLS.", "" + DEFAULT.tls) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.tls = parseBoolean(value);
}
},
TESTCA("", "Use the provided Test Certificate for TLS.", "" + DEFAULT.testca) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.testca = parseBoolean(value);
}
},
TRANSPORT("STR", Transport.getDescriptionString(), DEFAULT.transport.name().toLowerCase()) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.transport = Transport.valueOf(value.toUpperCase());
}
},
DURATION("SECONDS", "Duration of the benchmark.", "" + DEFAULT.duration) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.duration = parseInt(value);
}
},
WARMUP_DURATION("SECONDS", "Warmup Duration of the benchmark.", "" + DEFAULT.warmupDuration) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.warmupDuration = parseInt(value);
}
},
DIRECTEXECUTOR("",
"Don't use a threadpool for RPC calls, instead execute calls directly "
+ "in the transport thread.", "" + DEFAULT.directExecutor) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.directExecutor = parseBoolean(value);
}
},
SAVE_HISTOGRAM("FILE", "Write the histogram with the latency recordings to file.", null) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.histogramFile = value;
}
},
STREAMING_RPCS("", "Use Streaming RPCs.", "false") {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.rpcType = STREAMING;
}
},
CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.",
"" + DEFAULT.connectionWindow) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.connectionWindow = parseInt(value);
}
},
STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.",
"" + DEFAULT.streamWindow) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.streamWindow = parseInt(value);
}
},
TARGET_QPS("INT", "Average number of QPS to shoot for.", "" + DEFAULT.targetQps, true) {
@Override
protected void setClientValue(ClientConfiguration config, String value) {
config.targetQps = parseInt(value);
}
};
private final String type;
private final String description;
private final String defaultValue;
private final boolean required;
ClientParam(String type, String description, String defaultValue) {
this(type, description, defaultValue, false);
}
ClientParam(String type, String description, String defaultValue, boolean required) {
this.type = type;
this.description = description;
this.defaultValue = defaultValue;
this.required = required;
}
@Override
public String getName() {
return name().toLowerCase();
}
@Override
public String getType() {
return type;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getDefaultValue() {
return defaultValue;
}
@Override
public boolean isRequired() {
return required;
}
@Override
public void setValue(Configuration config, String value) {
setClientValue((ClientConfiguration) config, value);
}
protected abstract void setClientValue(ClientConfiguration config, String value);
}
}

View File

@ -1,123 +0,0 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static io.grpc.testing.integration.Util.loadCert;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import grpc.testing.Qpstest;
import grpc.testing.Qpstest.SimpleRequest;
import io.grpc.Channel;
import io.grpc.transport.netty.GrpcSslContexts;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import io.netty.handler.ssl.SslContext;
import org.HdrHistogram.Histogram;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
/**
* Utility methods for client implementations.
*/
final class ClientUtil {
private ClientUtil() {
}
static SimpleRequest newRequest(ClientConfiguration config) {
ByteString body = ByteString.copyFrom(new byte[config.clientPayload]);
Qpstest.Payload payload = Qpstest.Payload.newBuilder()
.setType(config.payloadType)
.setBody(body)
.build();
return SimpleRequest.newBuilder()
.setResponseType(config.payloadType)
.setResponseSize(config.serverPayload)
.setPayload(payload)
.build();
}
static Channel newChannel(ClientConfiguration config) throws IOException {
if (config.okhttp) {
if (config.tls) {
throw new IllegalStateException("TLS unsupported with okhttp");
}
return OkHttpChannelBuilder
.forAddress(config.host, config.port)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.build();
}
SslContext context = null;
InetAddress address = InetAddress.getByName(config.host);
NegotiationType negotiationType = config.tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
if (config.tls && config.testca) {
// Force the hostname to match the cert the server uses.
address = InetAddress.getByAddress("foo.test.google.fr", address.getAddress());
File cert = loadCert("ca.pem");
context = GrpcSslContexts.forClient().trustManager(cert).build();
}
return NettyChannelBuilder
.forAddress(new InetSocketAddress(address, config.port))
.negotiationType(negotiationType)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.sslContext(context)
.connectionWindowSize(config.connectionWindow)
.streamWindowSize(config.streamWindow)
.build();
}
static void saveHistogram(Histogram histogram, String filename) throws IOException {
File file;
PrintStream log = null;
try {
file = new File(filename);
if (file.exists()) {
file.delete();
}
log = new PrintStream(new FileOutputStream(file), false);
histogram.outputPercentileDistribution(log, 1.0);
} finally {
if (log != null) {
log.close();
}
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
/**
* Configuration for a benchmark application.
*/
public interface Configuration {
/**
* Builder for the {@link Configuration}.
* @param <T> The type of {@link Configuration} that this builder creates.
*/
interface Builder<T extends Configuration> {
/**
* Builds the {@link Configuration} from the given command-line arguments.
* @throws IllegalArgumentException if unable to build the configuration for any reason.
*/
T build(String[] args);
/**
* Prints the command-line usage for the application based on the options supported by this
* builder.
*/
void printUsage();
}
}

View File

@ -32,31 +32,32 @@
package io.grpc.benchmarks.qps;
import static com.google.common.base.Preconditions.checkNotNull;
import static grpc.testing.TestServiceGrpc.TestServiceStub;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.HOST;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.OKHTTP;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.PORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.TARGET_QPS;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.Builder.Option.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.HISTOGRAM_MAX_VALUE;
import static io.grpc.benchmarks.qps.ClientConfiguration.HISTOGRAM_PRECISION;
import static io.grpc.benchmarks.qps.ClientUtil.newRequest;
import static io.grpc.benchmarks.qps.ClientUtil.saveHistogram;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CONNECTION_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SAVE_HISTOGRAM;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.SERVER_PAYLOAD;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.STREAM_WINDOW;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TARGET_QPS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_MAX_VALUE;
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_PRECISION;
import static io.grpc.benchmarks.qps.Utils.newClientChannel;
import static io.grpc.benchmarks.qps.Utils.newRequest;
import static io.grpc.benchmarks.qps.Utils.saveHistogram;
import grpc.testing.Qpstest;
import grpc.testing.Qpstest.SimpleRequest;
import grpc.testing.TestServiceGrpc;
import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.SimpleRequest;
import io.grpc.testing.SimpleResponse;
import io.grpc.testing.TestServiceGrpc;
import io.grpc.testing.TestServiceGrpc.TestServiceStub;
import org.HdrHistogram.AtomicHistogram;
import org.HdrHistogram.Histogram;
@ -83,10 +84,9 @@ public class OpenLoopClient {
* Comment for checkstyle.
*/
public static void main(String... args) throws Exception {
ClientConfiguration.Builder configBuilder =
ClientConfiguration.newBuilder()
.addOptions(PORT, HOST, TARGET_QPS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS, TESTCA)
.addOptions(OKHTTP, DURATION, SAVE_HISTOGRAM, CONNECTION_WINDOW, STREAM_WINDOW);
ClientConfiguration.Builder configBuilder = ClientConfiguration.newBuilder(
ADDRESS, TARGET_QPS, CLIENT_PAYLOAD, SERVER_PAYLOAD, TLS,
TESTCA, TRANSPORT, DURATION, SAVE_HISTOGRAM, CONNECTION_WINDOW, STREAM_WINDOW);
ClientConfiguration config;
try {
config = configBuilder.build(args);
@ -108,7 +108,7 @@ public class OpenLoopClient {
}
config.channels = 1;
config.directExecutor = true;
Channel ch = ClientUtil.newChannel(config);
Channel ch = newClientChannel(config);
SimpleRequest req = newRequest(config);
LoadGenerationWorker worker =
new LoadGenerationWorker(ch, req, config.targetQps, config.duration);
@ -198,12 +198,12 @@ public class OpenLoopClient {
}
private void newRpc(TestServiceStub stub) {
stub.unaryCall(request, new StreamObserver<Qpstest.SimpleResponse>() {
stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
private final long start = System.nanoTime();
@Override
public void onValue(Qpstest.SimpleResponse value) {
public void onValue(SimpleResponse value) {
}
@Override

View File

@ -0,0 +1,252 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static io.grpc.benchmarks.qps.SocketAddressValidator.INET;
import static io.grpc.benchmarks.qps.SocketAddressValidator.UDS;
import static io.grpc.benchmarks.qps.Utils.parseBoolean;
import static io.grpc.testing.integration.Util.pickUnusedPort;
import static java.lang.Integer.parseInt;
import io.grpc.transport.netty.NettyChannelBuilder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* Configuration options for benchmark servers.
*/
class ServerConfiguration implements Configuration {
private static final ServerConfiguration DEFAULT = new ServerConfiguration();
Transport transport = Transport.NETTY_NIO;
boolean tls;
boolean directExecutor;
SocketAddress address;
int connectionWindow = NettyChannelBuilder.DEFAULT_CONNECTION_WINDOW_SIZE;
int streamWindow = NettyChannelBuilder.DEFAULT_STREAM_WINDOW_SIZE;
private ServerConfiguration() {
}
static Builder newBuilder() {
return new Builder();
}
static class Builder extends AbstractConfigurationBuilder<ServerConfiguration> {
private static final List<Param> PARAMS = supportedParams();
private Builder() {
}
@Override
protected ServerConfiguration newConfiguration() {
return new ServerConfiguration();
}
@Override
protected Collection<Param> getParams() {
return PARAMS;
}
@Override
protected ServerConfiguration build0(ServerConfiguration config) {
if (config.tls && !config.transport.tlsSupported) {
throw new IllegalArgumentException(
"TLS unsupported with the " + config.transport.name().toLowerCase() + " transport");
}
// Verify that the address type is correct for the transport type.
config.transport.validateSocketAddress(config.address);
return config;
}
private static List<Param> supportedParams() {
return Collections.unmodifiableList(new ArrayList<Param>(
Arrays.asList(ServerParam.values())));
}
}
/**
* All of the supported transports.
*/
enum Transport {
NETTY_NIO(true, "The Netty Java NIO transport. Using this with TLS requires "
+ "that the Java bootclasspath be configured with Jetty ALPN boot.", INET),
NETTY_EPOLL(true, "The Netty native EPOLL transport. Using this with TLS requires that "
+ "OpenSSL be installed and configured as described in "
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.", INET),
NETTY_UNIX_DOMAIN_SOCKET(false, "The Netty Unix Domain Socket transport. This currently "
+ "does not support TLS.", UDS);
private final boolean tlsSupported;
private final String description;
private final SocketAddressValidator socketAddressValidator;
Transport(boolean tlsSupported, String description,
SocketAddressValidator socketAddressValidator) {
this.tlsSupported = tlsSupported;
this.description = description;
this.socketAddressValidator = socketAddressValidator;
}
/**
* Validates the given address for this transport.
*
* @throws IllegalArgumentException if the given address is invalid for this transport.
*/
void validateSocketAddress(SocketAddress address) {
if (!socketAddressValidator.isValidSocketAddress(address)) {
throw new IllegalArgumentException(
"Invalid address " + address + " for transport " + this);
}
}
static String getDescriptionString() {
StringBuilder builder = new StringBuilder("Select the transport to use. Options:\n");
boolean first = true;
for (Transport transport : Transport.values()) {
if (!first) {
builder.append("\n");
}
builder.append(transport.name().toLowerCase());
builder.append(": ");
builder.append(transport.description);
first = false;
}
return builder.toString();
}
}
enum ServerParam implements AbstractConfigurationBuilder.Param {
ADDRESS("STR", "Socket address (host:port) or Unix Domain Socket file name "
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
SocketAddress address = Utils.parseSocketAddress(value);
if (address instanceof InetSocketAddress) {
InetSocketAddress addr = (InetSocketAddress) address;
int port = addr.getPort() == 0 ? pickUnusedPort() : addr.getPort();
// Re-create the address so that the server is available on all local addresses.
address = new InetSocketAddress(port);
}
config.address = address;
}
},
TLS("", "Enable TLS.", "" + DEFAULT.tls) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.tls = parseBoolean(value);
}
},
TRANSPORT("STR", Transport.getDescriptionString(), DEFAULT.transport.name().toLowerCase()) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.transport = Transport.valueOf(value.toUpperCase());
}
},
DIRECTEXECUTOR("", "Don't use a threadpool for RPC calls, instead execute calls directly "
+ "in the transport thread.", "" + DEFAULT.directExecutor) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.directExecutor = parseBoolean(value);
}
},
CONNECTION_WINDOW("BYTES", "The HTTP/2 connection flow control window.",
"" + DEFAULT.connectionWindow) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.connectionWindow = parseInt(value);
}
},
STREAM_WINDOW("BYTES", "The HTTP/2 per-stream flow control window.",
"" + DEFAULT.streamWindow) {
@Override
protected void setServerValue(ServerConfiguration config, String value) {
config.streamWindow = parseInt(value);
}
};
private final String type;
private final String description;
private final String defaultValue;
private final boolean required;
ServerParam(String type, String description, String defaultValue) {
this(type, description, defaultValue, false);
}
ServerParam(String type, String description, String defaultValue, boolean required) {
this.type = type;
this.description = description;
this.defaultValue = defaultValue;
this.required = required;
}
@Override
public String getName() {
return name().toLowerCase();
}
@Override
public String getType() {
return type;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getDefaultValue() {
return defaultValue;
}
@Override
public boolean isRequired() {
return required;
}
@Override
public void setValue(Configuration config, String value) {
setServerValue((ServerConfiguration) config, value);
}
protected abstract void setServerValue(ServerConfiguration config, String value);
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* Verifies whether or not the given {@link SocketAddress} is valid.
*/
interface SocketAddressValidator {
/**
* Verifier for {@link InetSocketAddress}es.
*/
SocketAddressValidator INET = new SocketAddressValidator() {
@Override
public boolean isValidSocketAddress(SocketAddress address) {
return address instanceof InetSocketAddress;
}
};
/**
* Verifier for Netty Unix Domain Socket addresses.
*/
SocketAddressValidator UDS = new SocketAddressValidator() {
@Override
public boolean isValidSocketAddress(SocketAddress address) {
return "DomainSocketAddress".equals(address.getClass().getSimpleName());
}
};
/**
* Returns {@code true} if the given address is valid.
*/
boolean isValidSocketAddress(SocketAddress address);
}

View File

@ -0,0 +1,213 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static io.grpc.testing.integration.Util.loadCert;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.testing.Payload;
import io.grpc.testing.SimpleRequest;
import io.grpc.transport.netty.GrpcSslContexts;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import org.HdrHistogram.Histogram;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* Utility methods to support benchmarking classes.
*/
final class Utils {
private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
// The histogram can record values between 1 microsecond and 1 min.
static final long HISTOGRAM_MAX_VALUE = 60000000L;
// Value quantization will be no larger than 1/10^3 = 0.1%.
static final int HISTOGRAM_PRECISION = 3;
private Utils() {
}
static boolean parseBoolean(String value) {
return value.isEmpty() || Boolean.parseBoolean(value);
}
static SocketAddress parseSocketAddress(String value) {
if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
// Unix Domain Socket address.
try {
// Create the underlying file for the Unix Domain Socket.
String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
File file = new File(filePath);
if (!file.isAbsolute()) {
throw new IllegalArgumentException("File path must be absolute: " + filePath);
}
if (file.createNewFile()) {
// If this application created the file, delete it when the application exits.
file.deleteOnExit();
}
// Create the SocketAddress referencing the file.
Class<?> addressClass = Class.forName("io.netty.channel.unix.DomainSocketAddress");
return (SocketAddress) addressClass.getDeclaredConstructor(File.class)
.newInstance(file);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// Standard TCP/IP address.
String[] parts = value.split(":");
String host = parts[0];
int port = Integer.parseInt(parts[1]);
return new InetSocketAddress(host, port);
}
}
static SimpleRequest newRequest(ClientConfiguration config) {
ByteString body = ByteString.copyFrom(new byte[config.clientPayload]);
Payload payload = Payload.newBuilder()
.setType(config.payloadType)
.setBody(body)
.build();
return SimpleRequest.newBuilder()
.setResponseType(config.payloadType)
.setResponseSize(config.serverPayload)
.setPayload(payload)
.build();
}
static Channel newClientChannel(ClientConfiguration config) throws IOException {
if (config.transport == ClientConfiguration.Transport.OK_HTTP) {
InetSocketAddress addr = (InetSocketAddress) config.address;
return OkHttpChannelBuilder
.forAddress(addr.getHostName(), addr.getPort())
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.build();
}
// It's a Netty transport.
SslContext context = null;
NegotiationType negotiationType = config.tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
if (config.tls && config.testca) {
File cert = loadCert("ca.pem");
boolean useJdkSsl = config.transport == ClientConfiguration.Transport.NETTY_NIO;
context = GrpcSslContexts.forClient().trustManager(cert)
.sslProvider(useJdkSsl ? SslProvider.JDK : SslProvider.OPENSSL)
.build();
}
final EventLoopGroup group;
final Class<? extends io.netty.channel.Channel> channelType;
switch (config.transport) {
case NETTY_NIO: {
group = new NioEventLoopGroup();
channelType = NioSocketChannel.class;
break;
}
case NETTY_EPOLL: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends io.netty.channel.Channel> channelClass =
(Class<? extends io.netty.channel.Channel>) Class.forName(
"io.netty.channel.epoll.EpollSocketChannel");
group = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
case NETTY_UNIX_DOMAIN_SOCKET: {
try {
// These classes are only available on linux.
Class<?> groupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
@SuppressWarnings("unchecked")
Class<? extends io.netty.channel.Channel> channelClass =
(Class<? extends io.netty.channel.Channel>) Class.forName(
"io.netty.channel.epoll.EpollDomainSocketChannel");
group = (EventLoopGroup) groupClass.newInstance();
channelType = channelClass;
break;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
default: {
// Should never get here.
throw new IllegalArgumentException("Unsupported transport: " + config.transport);
}
}
return NettyChannelBuilder
.forAddress(config.address)
.eventLoopGroup(group)
.channelType(channelType)
.negotiationType(negotiationType)
.executor(config.directExecutor ? MoreExecutors.newDirectExecutorService() : null)
.sslContext(context)
.connectionWindowSize(config.connectionWindow)
.streamWindowSize(config.streamWindow)
.build();
}
static void saveHistogram(Histogram histogram, String filename) throws IOException {
File file;
PrintStream log = null;
try {
file = new File(filename);
if (file.exists() && !file.delete()) {
System.err.println("Failed deleting previous histogram file: " + file.getAbsolutePath());
}
log = new PrintStream(new FileOutputStream(file), false);
histogram.outputPercentileDistribution(log, 1.0);
} finally {
if (log != null) {
log.close();
}
}
}
}

View File

@ -30,19 +30,23 @@
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto2";
syntax = "proto3";
package grpc.testing;
option java_multiple_files = true;
option java_package = "io.grpc.testing";
option java_outer_classname = "QpsTestProto";
enum PayloadType {
// Compressable text format.
COMPRESSABLE= 1;
COMPRESSABLE = 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 2;
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 3;
RANDOM = 2;
}
message StatsRequest {
@ -52,56 +56,56 @@ message StatsRequest {
message ServerStats {
// wall clock time
required double time_elapsed = 1;
double time_elapsed = 1;
// user time used by the server process and threads
required double time_user = 2;
double time_user = 2;
// server time used by the server process and all threads
required double time_system = 3;
double time_system = 3;
}
message Payload {
// The type of data in body.
optional PayloadType type = 1;
PayloadType type = 1;
// Primary contents of payload.
optional bytes body = 2;
bytes body = 2;
}
message HistogramData {
repeated uint32 bucket = 1;
required double min_seen = 2;
required double max_seen = 3;
required double sum = 4;
required double sum_of_squares = 5;
required double count = 6;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
enum ClientType {
SYNCHRONOUS_CLIENT = 1;
ASYNC_CLIENT = 2;
SYNCHRONOUS_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 1;
ASYNC_SERVER = 2;
SYNCHRONOUS_SERVER = 0;
ASYNC_SERVER = 1;
}
enum RpcType {
UNARY = 1;
STREAMING = 2;
UNARY = 0;
STREAMING = 1;
}
message ClientConfig {
repeated string server_targets = 1;
required ClientType client_type = 2;
optional bool enable_ssl = 3 [default=false];
required int32 outstanding_rpcs_per_channel = 4;
required int32 client_channels = 5;
required int32 payload_size = 6;
ClientType client_type = 2;
bool enable_ssl = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
optional int32 async_client_threads = 7;
optional RpcType rpc_type = 8 [default=UNARY];
int32 async_client_threads = 7;
RpcType rpc_type = 8;
}
// Request current stats
@ -115,20 +119,20 @@ message ClientArgs {
}
message ClientStats {
required HistogramData latencies = 1;
required double time_elapsed = 3;
required double time_user = 4;
required double time_system = 5;
HistogramData latencies = 1;
double time_elapsed = 3;
double time_user = 4;
double time_system = 5;
}
message ClientStatus {
optional ClientStats stats = 1;
ClientStats stats = 1;
}
message ServerConfig {
required ServerType server_type = 1;
optional int32 threads = 2 [default=1];
optional bool enable_ssl = 3 [default=false];
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
}
message ServerArgs {
@ -139,25 +143,25 @@ message ServerArgs {
}
message ServerStatus {
optional ServerStats stats = 1;
required int32 port = 2;
ServerStats stats = 1;
int32 port = 2;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1 [default=COMPRESSABLE];
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2 [default=0];
int32 response_size = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
Payload payload = 3;
}
message SimpleResponse {
optional Payload payload = 1;
Payload payload = 1;
}
service TestService {