Add QPS Client to perform throughput and latency tests.

This commit is contained in:
Jakob Buchgraber 2015-02-10 16:01:26 -08:00
parent 127270bd5f
commit 3fd7d0675c
5 changed files with 563 additions and 0 deletions

41
benchmarks/build.gradle Normal file
View File

@ -0,0 +1,41 @@
apply plugin: 'application'
apply plugin: 'protobuf'
description = "gRPC Benchmarks"
mainClassName = "io.grpc.benchmarks.qps.Client"
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath libraries.protobuf_plugin
}
}
dependencies {
compile project(':grpc-core'),
project(':grpc-netty'),
project(':grpc-okhttp'),
project(':grpc-stub'),
project(':grpc-testing'),
libraries.junit,
libraries.mockito,
libraries.hdrhistogram
}
protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"]
compileJava.dependsOn = ['generateProto']
// Allow execution of test client and server.
task execute(dependsOn: classes, type:JavaExec) {
main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client'
classpath = sourceSets.main.runtimeClasspath
workingDir = project.rootDir
// If appArgs were provided, set the program arguments.
if (project.hasProperty("appArgs")) {
args = Eval.me(appArgs)
}
}

View File

@ -0,0 +1,360 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.benchmarks.qps;
import static grpc.testing.TestServiceGrpc.TestServiceStub;
import static grpc.testing.Qpstest.SimpleRequest;
import static grpc.testing.Qpstest.SimpleResponse;
import static java.lang.Math.max;
import com.google.common.base.Preconditions;
import grpc.testing.Qpstest.PayloadType;
import grpc.testing.TestServiceGrpc;
import io.grpc.Channel;
import io.grpc.ChannelImpl;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
import io.grpc.transport.okhttp.OkHttpChannelBuilder;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramIterationValue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
// TODO: Add OkHttp and Netty TLS Support
/**
* Runs lots of RPCs against a QPS Server to test for throughput and latency.
* It's a Java clone of the C version at
* https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc
*/
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getName());
// Can record values between 1 ns and 1 min (60 BILLION NS)
private static final long HISTOGRAM_MAX_VALUE = 60000000000L;
private static final int HISTOGRAM_PRECISION = 3;
// How long (in ns) to do RPCs before it counts
private static final long WARMUP_TIME = 5000000000L;
private int clientChannels = 4;
private int clientThreads = 4;
private int numRpcs = 100000;
private int payloadSize = 1;
private String serverHost = "127.0.0.1";
private int serverPort;
private boolean okhttp;
public static void main(String... args) throws Exception {
Client c = new Client();
c.run(args);
}
private void run(String[] args) throws Exception {
if (!parseArgs(args)) {
return;
}
SimpleRequest req = SimpleRequest.newBuilder()
.setResponseType(PayloadType.COMPRESSABLE)
.setResponseSize(payloadSize)
.build();
List<Channel> channels = new ArrayList<Channel>(clientChannels);
for (int i = 0; i < clientChannels; i++) {
channels.add(newChannel());
}
long warmupEnd = System.nanoTime() + WARMUP_TIME;
do {
doRpcs(channels.get(0), req, 10000).get();
} while (System.nanoTime() < warmupEnd);
long startTime = System.nanoTime();
List<Future<Histogram>> futures = new ArrayList<Future<Histogram>>(clientThreads);
for (int i = 0; i < clientThreads; i++) {
// The channel to thread assignment works a bit different than in the C++ version.
// It's the same for the "interesting cases": clientThreads == clientChannels and
// clientChannels == 1.
// It however doesn't support "cache thrashing" as mentioned in the comments of the
// C++ client. That's because it's my understanding that it doesn't make sense for our API
// as we neither use fixed threads per call nor do we pin them to specific cores.
Channel channel = channels.get(i % clientChannels);
futures.add(doRpcs(channel, req, numRpcs));
}
List<Histogram> histograms = new ArrayList<Histogram>(futures.size());
for (Future<Histogram> future : futures) {
histograms.add(future.get());
}
long elapsedTime = System.nanoTime() - startTime;
Histogram merged = merge(histograms);
assert merged.getTotalCount() == numRpcs * clientThreads;
printStats(merged, elapsedTime);
// shutdown
for (Channel channel : channels) {
((ChannelImpl) channel).shutdown();
}
}
private Channel newChannel() {
if (okhttp) {
return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build();
} else {
return NettyChannelBuilder.forAddress(serverHost, serverPort)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}
}
private boolean parseArgs(String[] args) {
try {
boolean hasServerPort = false;
for (String arg : args) {
if (!arg.startsWith("--")) {
System.err.println("All arguments must start with '--': " + arg);
printUsage();
return false;
}
String[] pair = arg.substring(2).split("=", 2);
if (pair.length < 2) {
continue;
}
String key = pair[0];
String value = pair[1];
if ("client_channels".equals(key)) {
clientChannels = max(Integer.parseInt(value), 1);
} else if ("client_threads".equals(key)) {
clientThreads = max(Integer.parseInt(value), 1);
} else if ("num_rpcs".equals(key)) {
numRpcs = max(Integer.parseInt(value), 1);
} else if ("payload_size".equals(key)) {
payloadSize = max(Integer.parseInt(value), 0);
} else if ("server_host".equals(key)) {
serverHost = value;
} else if ("server_port".equals(key)) {
serverPort = Integer.parseInt(value);
hasServerPort = true;
} else if ("transport".equals(key)) {
okhttp = "okhttp".equals(value);
}
}
if (!hasServerPort) {
System.err.println("'--server_port' was not specified.");
printUsage();
return false;
}
} catch (Exception e) {
e.printStackTrace();
printUsage();
return false;
}
return true;
}
private void printUsage() {
Client c = new Client();
System.out.println(
"Usage: [ARGS...]"
+ "\n"
+ "\n --server_port=INT Port of the server. Required. No default."
+ "\n --server_host=STR Hostname of the server. Default " + c.serverHost
+ "\n --client_channels=INT Number of client channels. Default " + c.clientChannels
+ "\n --client_threads=INT Number of client threads. Default " + c.clientThreads
+ "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs
+ "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize
+ "\n --transport=(okhttp|netty) The transport to use. Default netty"
);
}
private Future<Histogram> doRpcs(Channel channel,
final SimpleRequest request,
final int numRpcs) {
final TestServiceStub stub = TestServiceGrpc.newStub(channel);
final CountDownLatch remainingRpcs = new CountDownLatch(numRpcs);
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram, remainingRpcs);
stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
long lastCall = System.nanoTime();
@Override
public void onValue(SimpleResponse value) {
PayloadType type = value.getPayload().getType();
int actualSize = value.getPayload().getBody().size();
if (!PayloadType.COMPRESSABLE.equals(type)) {
throw new RuntimeException("type was '" + type + "', expected '" +
PayloadType.COMPRESSABLE + "'.");
}
if (payloadSize != actualSize) {
throw new RuntimeException("size was '" + actualSize + "', expected '" +
payloadSize + "'");
}
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.err.println("onError called: " + status);
future.cancel(true);
}
@Override
public void onCompleted() {
long now = System.nanoTime();
histogram.recordValue(now - lastCall);
lastCall = now;
remainingRpcs.countDown();
if (remainingRpcs.getCount() > 0) {
stub.unaryCall(request, this);
}
}
});
return future;
}
private Histogram merge(List<Histogram> histograms) {
Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
for (Histogram histogram : histograms) {
for (HistogramIterationValue value : histogram.allValues()) {
long latency = value.getValueIteratedTo();
long count = value.getCountAtValueIteratedTo();
merged.recordValueWithCount(latency, count);
}
}
return merged;
}
private void printStats(Histogram histogram, long elapsedTime) {
double percentiles[] = {50, 90, 95, 99, 99.9, 99.99};
// Generate a comma-separated string of percentiles
StringBuilder header = new StringBuilder();
StringBuilder values = new StringBuilder();
header.append("Threads, Channels, Payload Size, ");
values.append(String.format("%s, %d, %d, ", clientThreads, clientChannels, payloadSize));
for (double percentile : percentiles) {
header.append(percentile).append("%ile").append(", ");
values.append(histogram.getValueAtPercentile(percentile)).append(", ");
}
header.append("QPS");
values.append((histogram.getTotalCount() * 1000000000L) / elapsedTime);
System.out.println(header.toString());
System.out.println(values.toString());
}
private static class HistogramFuture implements Future<Histogram> {
private final Histogram histogram;
private final CountDownLatch latch;
private final AtomicBoolean canceled = new AtomicBoolean();
HistogramFuture(Histogram histogram, CountDownLatch latch) {
Preconditions.checkNotNull(histogram, "histogram");
Preconditions.checkNotNull(histogram, "latch");
this.histogram = histogram;
this.latch = latch;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (latch.getCount() > 0 && canceled.compareAndSet(false, true)) {
while (latch.getCount() > 0) {
latch.countDown();
}
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return canceled.get();
}
@Override
public boolean isDone() {
return latch.getCount() == 0 || canceled.get();
}
@Override
public Histogram get() throws InterruptedException, ExecutionException {
latch.await();
if (canceled.get()) {
throw new CancellationException();
}
return histogram;
}
@Override
public Histogram get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException,
TimeoutException {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,158 @@
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto2";
package grpc.testing;
enum PayloadType {
// Compressable text format.
COMPRESSABLE= 1;
// Uncompressable binary format.
UNCOMPRESSABLE = 2;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 3;
}
message StatsRequest {
// run number
optional int32 test_num = 1;
}
message ServerStats {
// wall clock time for timestamp
required double time_now = 1;
// user time used by the server process and threads
required double time_user = 2;
// server time used by the server process and all threads
required double time_system = 3;
// RPC count so far
optional int32 num_rpcs = 4;
}
message Payload {
// The type of data in body.
optional PayloadType type = 1;
// Primary contents of payload.
optional bytes body = 2;
}
message Latencies {
required double l_50 = 1;
required double l_90 = 2;
required double l_99 = 3;
required double l_999 = 4;
}
message StartArgs {
required string server_host = 1;
required int32 server_port = 2;
optional bool enable_ssl = 3 [default = false];
optional int32 client_threads = 4 [default = 1];
optional int32 client_channels = 5 [default = -1];
optional int32 num_rpcs = 6 [default = 1];
optional int32 payload_size = 7 [default = 1];
}
message StartResult {
required Latencies latencies = 1;
required int32 num_rpcs = 2;
required double time_elapsed = 3;
required double time_user = 4;
required double time_system = 5;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1 [default=COMPRESSABLE];
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
message SimpleResponse {
optional Payload payload = 1;
}
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
optional Payload payload = 1;
// Not expecting any payload from the response.
}
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
optional int32 aggregated_payload_size = 1;
}
message ResponseParameters {
// Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
required int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
required int32 interval_us = 2;
}
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
optional PayloadType response_type = 1 [default=COMPRESSABLE];
repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
message StreamingOutputCallResponse {
optional Payload payload = 1;
}
service TestService {
// Start test with specified workload
rpc StartTest(StartArgs) returns (Latencies);
// Collect stats from server, ignore request content
rpc CollectServerStats(StatsRequest) returns (ServerStats);
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by a sequence of responses (streamed download).
// The server returns the payload with client desired type and sizes.
rpc StreamingOutputCall(StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by one response (streamed upload).
// The server returns the aggregated size of client payload as the result.
rpc StreamingInputCall(stream StreamingInputCallRequest)
returns (StreamingInputCallResponse);
// A sequence of requests with each request served by the server immediately.
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by a sequence of responses.
// The server buffers all the client requests and then serves them in order. A
// stream of responses are returned to the client when the server starts with
// first request.
rpc HalfDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
}

View File

@ -28,6 +28,8 @@ subprojects {
hpack: 'com.twitter:hpack:0.9.1',
protobuf_plugin: 'ws.antonov.gradle.plugins:gradle-plugin-protobuf:0.9.1',
okhttp: 'com.squareup.okhttp:okhttp:2.2.0',
// used to collect benchmark results
hdrhistogram: 'org.hdrhistogram:HdrHistogram:2.1.4',
// TODO: Unreleased dependencies.
// These must already be installed in the local maven repository.

View File

@ -8,6 +8,7 @@ include ":grpc-testing"
include ":grpc-compiler"
include ":grpc-integration-testing"
include ":grpc-all"
include ":grpc-benchmarks"
project(':grpc-core').projectDir = "$rootDir/core" as File
project(':grpc-stub').projectDir = "$rootDir/stub" as File
@ -18,3 +19,4 @@ project(':grpc-testing').projectDir = "$rootDir/testing" as File
project(':grpc-compiler').projectDir = "$rootDir/compiler" as File
project(':grpc-integration-testing').projectDir = "$rootDir/integration-testing" as File
project(':grpc-all').projectDir = "$rootDir/all" as File
project(':grpc-benchmarks').projectDir = "$rootDir/benchmarks" as File