mirror of https://github.com/grpc/grpc-java.git
Implement the load worker that can receive control events from the load driver and initiate load testing scenarios.
Will be used for GRPC's continuous load testing process.
This commit is contained in:
parent
357878d2d6
commit
a7049bca3b
|
|
@ -13,6 +13,7 @@ buildscript {
|
|||
}
|
||||
}
|
||||
dependencies {
|
||||
classpath libraries.math
|
||||
classpath libraries.protobuf_plugin
|
||||
classpath "gradle.plugin.me.champeau.gradle:jmh-gradle-plugin:0.3.0"
|
||||
}
|
||||
|
|
@ -24,7 +25,7 @@ jmh {
|
|||
warmupIterations = 10
|
||||
iterations = 10
|
||||
fork = 1
|
||||
jvmArgs = "-server -Xms2g -Xmx2g -dsa -da -XX:+AggressiveOpts -XX:+UseBiasedLocking -XX:+UseFastAccessorMethods -XX:+OptimizeStringConcat"
|
||||
jvmArgs = "-server -Xms2g -Xmx2g"
|
||||
}
|
||||
|
||||
dependencies {
|
||||
|
|
@ -38,15 +39,22 @@ dependencies {
|
|||
libraries.mockito,
|
||||
libraries.hdrhistogram,
|
||||
libraries.netty_tcnative,
|
||||
libraries.netty_epoll
|
||||
libraries.netty_epoll,
|
||||
libraries.math
|
||||
}
|
||||
|
||||
configureProtoCompilation()
|
||||
|
||||
def vmArgs = [
|
||||
"-server",
|
||||
"-Xms2g",
|
||||
"-Xmx2g"
|
||||
]
|
||||
|
||||
task qps_client(type: CreateStartScripts) {
|
||||
mainClassName = "io.grpc.benchmarks.qps.AsyncClient"
|
||||
applicationName = "qps_client"
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath] + vmArgs
|
||||
outputDir = new File(project.buildDir, 'tmp')
|
||||
classpath = jar.outputs.files + project.configurations.runtime
|
||||
}
|
||||
|
|
@ -54,7 +62,7 @@ task qps_client(type: CreateStartScripts) {
|
|||
task openloop_client(type: CreateStartScripts) {
|
||||
mainClassName = "io.grpc.benchmarks.qps.OpenLoopClient"
|
||||
applicationName = "openloop_client"
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath] + vmArgs
|
||||
outputDir = new File(project.buildDir, 'tmp')
|
||||
classpath = jar.outputs.files + project.configurations.runtime
|
||||
}
|
||||
|
|
@ -62,7 +70,15 @@ task openloop_client(type: CreateStartScripts) {
|
|||
task qps_server(type: CreateStartScripts) {
|
||||
mainClassName = "io.grpc.benchmarks.qps.AsyncServer"
|
||||
applicationName = "qps_server"
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath]
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath] + vmArgs
|
||||
outputDir = new File(project.buildDir, 'tmp')
|
||||
classpath = jar.outputs.files + project.configurations.runtime
|
||||
}
|
||||
|
||||
task benchmark_worker(type: CreateStartScripts) {
|
||||
mainClassName = "io.grpc.benchmarks.driver.LoadWorker"
|
||||
applicationName = "benchmark_worker"
|
||||
defaultJvmOpts = ["-Xbootclasspath/p:" + configurations.alpnboot.asPath] + vmArgs
|
||||
outputDir = new File(project.buildDir, 'tmp')
|
||||
classpath = jar.outputs.files + project.configurations.runtime
|
||||
}
|
||||
|
|
@ -71,6 +87,7 @@ applicationDistribution.into("bin") {
|
|||
from(qps_client)
|
||||
from(openloop_client)
|
||||
from(qps_server)
|
||||
from(benchmark_worker)
|
||||
fileMode = 0755
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -33,8 +33,6 @@ package io.grpc.benchmarks.netty;
|
|||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.Drainable;
|
||||
import io.grpc.KnownLength;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
|
|
@ -44,22 +42,19 @@ import io.grpc.ServerCall;
|
|||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerServiceDefinition;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.ByteBufOutputMarshaller;
|
||||
import io.grpc.netty.NegotiationType;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.netty.NettyServerBuilder;
|
||||
import io.grpc.stub.ClientCalls;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.EmptyByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NetworkInterface;
|
||||
|
|
@ -520,66 +515,4 @@ public abstract class AbstractBenchmark {
|
|||
server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple {@link io.grpc.MethodDescriptor.Marshaller} for Netty ByteBuf.
|
||||
*/
|
||||
protected static class ByteBufOutputMarshaller implements MethodDescriptor.Marshaller<ByteBuf> {
|
||||
|
||||
public static final EmptyByteBuf EMPTY_BYTE_BUF =
|
||||
new EmptyByteBuf(PooledByteBufAllocator.DEFAULT);
|
||||
|
||||
protected ByteBufOutputMarshaller() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream stream(ByteBuf value) {
|
||||
return new ByteBufInputStream(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf parse(InputStream stream) {
|
||||
try {
|
||||
// We don't do anything with the message and it's already been read into buffers
|
||||
// so just skip copying it.
|
||||
stream.skip(stream.available());
|
||||
return EMPTY_BYTE_BUF;
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link Drainable} {@code InputStream} that reads an {@link io.netty.buffer.ByteBuf}.
|
||||
*/
|
||||
private static class ByteBufInputStream extends InputStream
|
||||
implements Drainable, KnownLength {
|
||||
|
||||
private ByteBuf buf;
|
||||
|
||||
private ByteBufInputStream(ByteBuf buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(OutputStream target) throws IOException {
|
||||
int readbableBytes = buf.readableBytes();
|
||||
buf.readBytes(target, readbableBytes);
|
||||
buf = null;
|
||||
return readbableBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
if (buf != null) {
|
||||
return buf.readableBytes();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks;
|
||||
|
||||
import io.grpc.Drainable;
|
||||
import io.grpc.KnownLength;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* A {@link Drainable} {@code InputStream} that reads an {@link ByteBuf}.
|
||||
*/
|
||||
public class ByteBufInputStream extends InputStream
|
||||
implements Drainable, KnownLength {
|
||||
|
||||
private ByteBuf buf;
|
||||
|
||||
ByteBufInputStream(ByteBuf buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int drainTo(OutputStream target) throws IOException {
|
||||
int readableBytes = buf.readableBytes();
|
||||
buf.readBytes(target, readableBytes);
|
||||
buf = null;
|
||||
return readableBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
if (buf != null) {
|
||||
return buf.readableBytes();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks;
|
||||
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.EmptyByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* Simple {@link MethodDescriptor.Marshaller} for Netty's {@link ByteBuf}.
|
||||
*/
|
||||
public class ByteBufOutputMarshaller implements MethodDescriptor.Marshaller<ByteBuf> {
|
||||
|
||||
public static final EmptyByteBuf EMPTY_BYTE_BUF =
|
||||
new EmptyByteBuf(PooledByteBufAllocator.DEFAULT);
|
||||
|
||||
@Override
|
||||
public InputStream stream(ByteBuf value) {
|
||||
return new ByteBufInputStream(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf parse(InputStream stream) {
|
||||
try {
|
||||
// We don't do anything with the message and it's already been read into buffers
|
||||
// so just skip copying it.
|
||||
stream.skip(stream.available());
|
||||
return EMPTY_BYTE_BUF;
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -29,7 +29,7 @@
|
|||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.qps;
|
||||
package io.grpc.benchmarks;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
|
@ -37,7 +37,7 @@ import java.net.SocketAddress;
|
|||
/**
|
||||
* Verifies whether or not the given {@link SocketAddress} is valid.
|
||||
*/
|
||||
interface SocketAddressValidator {
|
||||
public interface SocketAddressValidator {
|
||||
/**
|
||||
* Verifier for {@link InetSocketAddress}es.
|
||||
*/
|
||||
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* All of the supported transports.
|
||||
*/
|
||||
public enum Transport {
|
||||
NETTY_NIO(true, "The Netty Java NIO transport. Using this with TLS requires "
|
||||
+ "that the Java bootclasspath be configured with Jetty ALPN boot.",
|
||||
SocketAddressValidator.INET),
|
||||
NETTY_EPOLL(true, "The Netty native EPOLL transport. Using this with TLS requires that "
|
||||
+ "OpenSSL be installed and configured as described in "
|
||||
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.",
|
||||
SocketAddressValidator.INET),
|
||||
NETTY_UNIX_DOMAIN_SOCKET(false, "The Netty Unix Domain Socket transport. This currently "
|
||||
+ "does not support TLS.", SocketAddressValidator.UDS),
|
||||
OK_HTTP(true, "The OkHttp transport.", SocketAddressValidator.INET);
|
||||
|
||||
public final boolean tlsSupported;
|
||||
final String description;
|
||||
final SocketAddressValidator socketAddressValidator;
|
||||
|
||||
Transport(boolean tlsSupported, String description,
|
||||
SocketAddressValidator socketAddressValidator) {
|
||||
this.tlsSupported = tlsSupported;
|
||||
this.description = description;
|
||||
this.socketAddressValidator = socketAddressValidator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the given address for this transport.
|
||||
*
|
||||
* @throws IllegalArgumentException if the given address is invalid for this transport.
|
||||
*/
|
||||
public void validateSocketAddress(SocketAddress address) {
|
||||
if (!socketAddressValidator.isValidSocketAddress(address)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid address " + address + " for transport " + this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Describe the {@link Transport}.
|
||||
*/
|
||||
public static String getDescriptionString() {
|
||||
StringBuilder builder = new StringBuilder("Select the transport to use. Options:\n");
|
||||
boolean first = true;
|
||||
for (Transport transport : Transport.values()) {
|
||||
if (!first) {
|
||||
builder.append("\n");
|
||||
}
|
||||
builder.append(transport.name().toLowerCase());
|
||||
builder.append(": ");
|
||||
builder.append(transport.description);
|
||||
first = false;
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -29,12 +29,16 @@
|
|||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.qps;
|
||||
package io.grpc.benchmarks;
|
||||
|
||||
import static io.grpc.benchmarks.proto.Messages.SimpleResponse;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.proto.Messages;
|
||||
import io.grpc.benchmarks.proto.Messages.Payload;
|
||||
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
|
|
@ -64,27 +68,33 @@ import java.net.InetSocketAddress;
|
|||
import java.net.SocketAddress;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
/**
|
||||
* Utility methods to support benchmarking classes.
|
||||
*/
|
||||
final class Utils {
|
||||
public final class Utils {
|
||||
private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
|
||||
|
||||
// The histogram can record values between 1 microsecond and 1 min.
|
||||
static final long HISTOGRAM_MAX_VALUE = 60000000L;
|
||||
public static final long HISTOGRAM_MAX_VALUE = 60000000L;
|
||||
// Value quantization will be no larger than 1/10^3 = 0.1%.
|
||||
static final int HISTOGRAM_PRECISION = 3;
|
||||
public static final int HISTOGRAM_PRECISION = 3;
|
||||
|
||||
public static int DEFAULT_FLOW_CONTROL_WINDOW = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
|
||||
|
||||
private Utils() {
|
||||
}
|
||||
|
||||
static boolean parseBoolean(String value) {
|
||||
public static boolean parseBoolean(String value) {
|
||||
return value.isEmpty() || Boolean.parseBoolean(value);
|
||||
}
|
||||
|
||||
static SocketAddress parseSocketAddress(String value) {
|
||||
/**
|
||||
* Parse a {@link SocketAddress} from the given string.
|
||||
*/
|
||||
public static SocketAddress parseSocketAddress(String value) {
|
||||
if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
|
||||
// Unix Domain Socket address.
|
||||
// Create the underlying file for the Unix Domain Socket.
|
||||
|
|
@ -116,35 +126,26 @@ final class Utils {
|
|||
}
|
||||
}
|
||||
|
||||
static SimpleRequest newRequest(ClientConfiguration config) {
|
||||
ByteString body = ByteString.copyFrom(new byte[config.clientPayload]);
|
||||
Payload payload = Payload.newBuilder()
|
||||
.setType(config.payloadType)
|
||||
.setBody(body)
|
||||
.build();
|
||||
|
||||
return SimpleRequest.newBuilder()
|
||||
.setResponseType(config.payloadType)
|
||||
.setResponseSize(config.serverPayload)
|
||||
.setPayload(payload)
|
||||
.build();
|
||||
}
|
||||
|
||||
static ManagedChannel newClientChannel(ClientConfiguration config) throws IOException {
|
||||
if (config.transport == ClientConfiguration.Transport.OK_HTTP) {
|
||||
InetSocketAddress addr = (InetSocketAddress) config.address;
|
||||
/**
|
||||
* Create a {@link ManagedChannel} for the given parameters.
|
||||
*/
|
||||
public static ManagedChannel newClientChannel(Transport transport, SocketAddress address,
|
||||
boolean tls, boolean testca, @Nullable String authorityOverride, boolean useDefaultCiphers,
|
||||
int flowControlWindow, boolean directExecutor) throws IOException {
|
||||
if (transport == Transport.OK_HTTP) {
|
||||
InetSocketAddress addr = (InetSocketAddress) address;
|
||||
OkHttpChannelBuilder builder = OkHttpChannelBuilder
|
||||
.forAddress(addr.getHostName(), addr.getPort());
|
||||
if (config.directExecutor) {
|
||||
if (directExecutor) {
|
||||
builder.directExecutor();
|
||||
}
|
||||
builder.negotiationType(config.tls ? io.grpc.okhttp.NegotiationType.TLS
|
||||
builder.negotiationType(tls ? io.grpc.okhttp.NegotiationType.TLS
|
||||
: io.grpc.okhttp.NegotiationType.PLAINTEXT);
|
||||
if (config.tls) {
|
||||
if (tls) {
|
||||
SSLSocketFactory factory;
|
||||
if (config.testca) {
|
||||
if (testca) {
|
||||
builder.overrideAuthority(
|
||||
GrpcUtil.authorityFromHostAndPort(TestUtils.TEST_SERVER_HOST, addr.getPort()));
|
||||
GrpcUtil.authorityFromHostAndPort(authorityOverride, addr.getPort()));
|
||||
try {
|
||||
factory = TestUtils.newSslSocketFactoryForCa(TestUtils.loadCert("ca.pem"));
|
||||
} catch (Exception e) {
|
||||
|
|
@ -155,22 +156,25 @@ final class Utils {
|
|||
}
|
||||
builder.sslSocketFactory(factory);
|
||||
}
|
||||
if (authorityOverride != null) {
|
||||
builder.overrideAuthority(authorityOverride);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// It's a Netty transport.
|
||||
SslContext sslContext = null;
|
||||
NegotiationType negotiationType = config.tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
|
||||
if (config.tls && config.testca) {
|
||||
NegotiationType negotiationType = tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT;
|
||||
if (tls && testca) {
|
||||
File cert = TestUtils.loadCert("ca.pem");
|
||||
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient().trustManager(cert);
|
||||
if (config.transport == ClientConfiguration.Transport.NETTY_NIO) {
|
||||
if (transport == Transport.NETTY_NIO) {
|
||||
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.JDK);
|
||||
} else {
|
||||
// Native transport with OpenSSL
|
||||
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL);
|
||||
}
|
||||
if (config.useDefaultCiphers) {
|
||||
if (useDefaultCiphers) {
|
||||
sslContextBuilder.ciphers(null);
|
||||
}
|
||||
sslContext = sslContextBuilder.build();
|
||||
|
|
@ -181,7 +185,7 @@ final class Utils {
|
|||
.setDaemon(true)
|
||||
.setNameFormat("ELG-%d")
|
||||
.build();
|
||||
switch (config.transport) {
|
||||
switch (transport) {
|
||||
case NETTY_NIO:
|
||||
group = new NioEventLoopGroup(0, tf);
|
||||
channelType = NioSocketChannel.class;
|
||||
|
|
@ -201,22 +205,28 @@ final class Utils {
|
|||
|
||||
default:
|
||||
// Should never get here.
|
||||
throw new IllegalArgumentException("Unsupported transport: " + config.transport);
|
||||
throw new IllegalArgumentException("Unsupported transport: " + transport);
|
||||
}
|
||||
NettyChannelBuilder builder = NettyChannelBuilder
|
||||
.forAddress(config.address)
|
||||
.forAddress(address)
|
||||
.eventLoopGroup(group)
|
||||
.channelType(channelType)
|
||||
.negotiationType(negotiationType)
|
||||
.sslContext(sslContext)
|
||||
.flowControlWindow(config.flowControlWindow);
|
||||
if (config.directExecutor) {
|
||||
.flowControlWindow(flowControlWindow);
|
||||
if (authorityOverride != null) {
|
||||
builder.overrideAuthority(authorityOverride);
|
||||
}
|
||||
if (directExecutor) {
|
||||
builder.directExecutor();
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
static void saveHistogram(Histogram histogram, String filename) throws IOException {
|
||||
/**
|
||||
* Save a {@link Histogram} to a file.
|
||||
*/
|
||||
public static void saveHistogram(Histogram histogram, String filename) throws IOException {
|
||||
File file;
|
||||
PrintStream log = null;
|
||||
try {
|
||||
|
|
@ -232,4 +242,40 @@ final class Utils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a {@link SimpleResponse} for the given request.
|
||||
*/
|
||||
public static SimpleResponse makeResponse(SimpleRequest request) {
|
||||
if (request.getResponseSize() > 0) {
|
||||
if (!Messages.PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
|
||||
throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
|
||||
}
|
||||
|
||||
ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
|
||||
Messages.PayloadType type = request.getResponseType();
|
||||
|
||||
Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
|
||||
return SimpleResponse.newBuilder().setPayload(payload).build();
|
||||
}
|
||||
return SimpleResponse.getDefaultInstance();
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a {@link SimpleRequest} with the specified dimensions.
|
||||
*/
|
||||
public static SimpleRequest makeRequest(Messages.PayloadType payloadType, int reqLength,
|
||||
int respLength) {
|
||||
ByteString body = ByteString.copyFrom(new byte[reqLength]);
|
||||
Payload payload = Payload.newBuilder()
|
||||
.setType(payloadType)
|
||||
.setBody(body)
|
||||
.build();
|
||||
|
||||
return SimpleRequest.newBuilder()
|
||||
.setResponseType(payloadType)
|
||||
.setResponseSize(respLength)
|
||||
.setPayload(payload)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,533 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.driver;
|
||||
|
||||
import com.sun.management.OperatingSystemMXBean;
|
||||
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.Transport;
|
||||
import io.grpc.benchmarks.Utils;
|
||||
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
|
||||
import io.grpc.benchmarks.proto.Control;
|
||||
import io.grpc.benchmarks.proto.Messages;
|
||||
import io.grpc.benchmarks.proto.Payloads;
|
||||
import io.grpc.benchmarks.proto.Stats;
|
||||
import io.grpc.internal.ManagedChannelImpl;
|
||||
import io.grpc.stub.ClientCalls;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.channel.epoll.Epoll;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import org.HdrHistogram.Histogram;
|
||||
import org.HdrHistogram.LogarithmicIterator;
|
||||
import org.HdrHistogram.Recorder;
|
||||
import org.apache.commons.math3.distribution.ExponentialDistribution;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* Implements the client-side contract for the load testing scenarios.
|
||||
*/
|
||||
class LoadClient {
|
||||
|
||||
private static final Logger log = Logger.getLogger(LoadClient.class.getName());
|
||||
private ByteBuf genericRequest;
|
||||
|
||||
private final Control.ClientConfig config;
|
||||
private final ExponentialDistribution distribution;
|
||||
private volatile boolean shutdown;
|
||||
private final int threadCount;
|
||||
|
||||
ManagedChannel[] channels;
|
||||
BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[] blockingStubs;
|
||||
BenchmarkServiceGrpc.BenchmarkServiceStub[] asyncStubs;
|
||||
Recorder recorder;
|
||||
private ExecutorService fixedThreadPool;
|
||||
private Messages.SimpleRequest simpleRequest;
|
||||
private final OperatingSystemMXBean osBean;
|
||||
private long lastMarkCpuTime;
|
||||
|
||||
LoadClient(Control.ClientConfig config) throws Exception {
|
||||
log.log(Level.INFO, "Client Config \n" + config.toString());
|
||||
this.config = config;
|
||||
// Create the channels
|
||||
channels = new ManagedChannelImpl[config.getClientChannels()];
|
||||
for (int i = 0; i < config.getClientChannels(); i++) {
|
||||
channels[i] =
|
||||
Utils.newClientChannel(
|
||||
Epoll.isAvailable() ? Transport.NETTY_EPOLL : Transport.NETTY_NIO,
|
||||
Utils.parseSocketAddress(config.getServerTargets(i % config.getServerTargetsCount())),
|
||||
config.hasSecurityParams(),
|
||||
config.hasSecurityParams() && config.getSecurityParams().getUseTestCa(),
|
||||
config.hasSecurityParams()
|
||||
? config.getSecurityParams().getServerHostOverride() :
|
||||
null,
|
||||
true,
|
||||
Utils.DEFAULT_FLOW_CONTROL_WINDOW,
|
||||
false);
|
||||
}
|
||||
|
||||
// Create a stub per channel
|
||||
if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
|
||||
asyncStubs = new BenchmarkServiceGrpc.BenchmarkServiceStub[channels.length];
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
asyncStubs[i] = BenchmarkServiceGrpc.newStub(channels[i]);
|
||||
}
|
||||
} else {
|
||||
blockingStubs = new BenchmarkServiceGrpc.BenchmarkServiceBlockingStub[channels.length];
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
blockingStubs[i] = BenchmarkServiceGrpc.newBlockingStub(channels[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine no of threads
|
||||
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
|
||||
threadCount = config.getOutstandingRpcsPerChannel() * config.getClientChannels();
|
||||
} else {
|
||||
threadCount = config.getAsyncClientThreads() == 0
|
||||
? Runtime.getRuntime().availableProcessors()
|
||||
: config.getAsyncClientThreads();
|
||||
}
|
||||
// Use a fixed sized pool of daemon threads.
|
||||
fixedThreadPool = Executors.newFixedThreadPool(threadCount,
|
||||
new DefaultThreadFactory("client-worker", true));
|
||||
|
||||
// Create the load distribution
|
||||
if (config.getLoadParams().getClosedLoop() != null) {
|
||||
distribution = null;
|
||||
} else if (config.getLoadParams().getPoisson() != null) {
|
||||
// Mean of exp distribution per thread is <no threads> / <offered load per second>
|
||||
distribution = new ExponentialDistribution(
|
||||
(double) threadCount / config.getLoadParams().getPoisson().getOfferedLoad());
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// Create payloads
|
||||
switch (config.getPayloadConfig().getPayloadCase()) {
|
||||
case SIMPLE_PARAMS: {
|
||||
Payloads.SimpleProtoParams simpleParams = config.getPayloadConfig().getSimpleParams();
|
||||
simpleRequest = Utils.makeRequest(Messages.PayloadType.COMPRESSABLE,
|
||||
simpleParams.getReqSize(), simpleParams.getRespSize());
|
||||
break;
|
||||
}
|
||||
case BYTEBUF_PARAMS: {
|
||||
PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
|
||||
genericRequest = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
|
||||
if (genericRequest.capacity() > 0) {
|
||||
genericRequest.writerIndex(genericRequest.capacity() - 1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// Not implemented yet
|
||||
throw new IllegalArgumentException("Scenario not implemented");
|
||||
}
|
||||
}
|
||||
|
||||
List<OperatingSystemMXBean> beans =
|
||||
ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class);
|
||||
if (!beans.isEmpty()) {
|
||||
osBean = beans.get(0);
|
||||
} else {
|
||||
osBean = null;
|
||||
}
|
||||
|
||||
// Create the histogram recorder
|
||||
recorder = new Recorder((long) config.getHistogramParams().getMaxPossible(), 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the load scenario.
|
||||
*/
|
||||
void start() {
|
||||
Runnable r;
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
r = null;
|
||||
switch (config.getPayloadConfig().getPayloadCase()) {
|
||||
case SIMPLE_PARAMS: {
|
||||
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
|
||||
if (config.getRpcType() == Control.RpcType.UNARY) {
|
||||
r = new BlockingUnaryWorker(blockingStubs[i % blockingStubs.length]);
|
||||
}
|
||||
} else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
|
||||
if (config.getRpcType() == Control.RpcType.UNARY) {
|
||||
r = new AsyncUnaryWorker(asyncStubs[i % asyncStubs.length]);
|
||||
} else if (config.getRpcType() == Control.RpcType.STREAMING) {
|
||||
r = new AsyncPingPongWorker(asyncStubs[i % asyncStubs.length]);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case BYTEBUF_PARAMS: {
|
||||
if (config.getClientType() == Control.ClientType.SYNC_CLIENT) {
|
||||
if (config.getRpcType() == Control.RpcType.UNARY) {
|
||||
r = new GenericBlockingUnaryWorker(channels[i % channels.length]);
|
||||
}
|
||||
} else if (config.getClientType() == Control.ClientType.ASYNC_CLIENT) {
|
||||
if (config.getRpcType() == Control.RpcType.UNARY) {
|
||||
r = new GenericAsyncUnaryWorker(channels[i % channels.length]);
|
||||
} else if (config.getRpcType() == Control.RpcType.STREAMING) {
|
||||
r = new GenericAsyncPingPongWorker(channels[i % channels.length]);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw Status.UNIMPLEMENTED.withDescription(
|
||||
"Unknown payload case " + config.getPayloadConfig().getPayloadCase().name())
|
||||
.asRuntimeException();
|
||||
}
|
||||
}
|
||||
if (r == null) {
|
||||
throw new IllegalStateException(config.getRpcType().name()
|
||||
+ " not supported for client type "
|
||||
+ config.getClientType());
|
||||
}
|
||||
fixedThreadPool.execute(r);
|
||||
}
|
||||
if (osBean != null) {
|
||||
lastMarkCpuTime = osBean.getProcessCpuTime();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Take a snapshot of the statistics which can be returned to the driver.
|
||||
*/
|
||||
Stats.ClientStats getStats() {
|
||||
Histogram intervalHistogram = recorder.getIntervalHistogram();
|
||||
|
||||
Stats.ClientStats.Builder statsBuilder = Stats.ClientStats.newBuilder();
|
||||
Stats.HistogramData.Builder latenciesBuilder = statsBuilder.getLatenciesBuilder();
|
||||
double resolution = 1.0 + Math.max(config.getHistogramParams().getResolution(), 0.01);
|
||||
LogarithmicIterator logIterator = new LogarithmicIterator(intervalHistogram, 1,
|
||||
resolution);
|
||||
double base = 1;
|
||||
while (logIterator.hasNext()) {
|
||||
latenciesBuilder.addBucket((int) logIterator.next().getCountAddedInThisIterationStep());
|
||||
base = base * resolution;
|
||||
}
|
||||
// Driver expects values for all buckets in the range, not just the range of buckets that
|
||||
// have values.
|
||||
while (base < config.getHistogramParams().getMaxPossible()) {
|
||||
latenciesBuilder.addBucket(0);
|
||||
base = base * resolution;
|
||||
}
|
||||
latenciesBuilder.setMaxSeen((double) intervalHistogram.getMaxValue());
|
||||
latenciesBuilder.setMinSeen((double) intervalHistogram.getMinNonZeroValue());
|
||||
latenciesBuilder.setCount(intervalHistogram.getTotalCount());
|
||||
latenciesBuilder.setSum(intervalHistogram.getMean()
|
||||
* intervalHistogram.getTotalCount());
|
||||
// TODO: No support for sum of squares
|
||||
|
||||
statsBuilder.setTimeElapsed(((double)(intervalHistogram.getEndTimeStamp()
|
||||
- intervalHistogram.getStartTimeStamp())) / 1000.0);
|
||||
if (osBean != null) {
|
||||
// Report all the CPU time as user-time (which is intentionally incorrect)
|
||||
long nowCpu = osBean.getProcessCpuTime();
|
||||
statsBuilder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0);
|
||||
lastMarkCpuTime = nowCpu;
|
||||
}
|
||||
return statsBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the scenario as cleanly as possible.
|
||||
*/
|
||||
void shutdownNow() {
|
||||
shutdown = true;
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
// Initiate channel shutdown
|
||||
channels[i].shutdown();
|
||||
}
|
||||
for (int i = 0; i < channels.length; i++) {
|
||||
try {
|
||||
// Wait for channel termination
|
||||
channels[i].awaitTermination(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException ie) {
|
||||
channels[i].shutdownNow();
|
||||
}
|
||||
}
|
||||
fixedThreadPool.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Record the event elapsed time to the histogram and delay initiation of the next event based
|
||||
* on the load distribution.
|
||||
*/
|
||||
void delay(long alreadyElapsed) {
|
||||
recorder.recordValue(alreadyElapsed);
|
||||
if (distribution != null) {
|
||||
long nextPermitted = Math.round(distribution.sample() * 1000000000.0);
|
||||
if (nextPermitted > alreadyElapsed) {
|
||||
LockSupport.parkNanos(nextPermitted - alreadyElapsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes blocking unary calls. Event timing is the duration between sending the
|
||||
* request and receiving the response.
|
||||
*/
|
||||
class BlockingUnaryWorker implements Runnable {
|
||||
final BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub;
|
||||
|
||||
private BlockingUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub) {
|
||||
this.stub = stub;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (!shutdown) {
|
||||
long now = System.nanoTime();
|
||||
stub.unaryCall(simpleRequest);
|
||||
delay(System.nanoTime() - now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes async unary calls. Event timing is the duration between sending the
|
||||
* request and receiving the response.
|
||||
*/
|
||||
private class AsyncUnaryWorker implements Runnable {
|
||||
final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
|
||||
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
|
||||
|
||||
AsyncUnaryWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
|
||||
this.stub = stub;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!shutdown) {
|
||||
maxOutstanding.acquireUninterruptibly();
|
||||
stub.unaryCall(simpleRequest, new StreamObserver<Messages.SimpleResponse>() {
|
||||
long now = System.nanoTime();
|
||||
@Override
|
||||
public void onNext(Messages.SimpleResponse value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
maxOutstanding.release();
|
||||
log.log(Level.INFO, "Error in AsyncUnary call", t);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
delay(System.nanoTime() - now);
|
||||
maxOutstanding.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes a streaming ping-pong call. Event timing is the duration between
|
||||
* sending the ping and receiving the pong.
|
||||
*/
|
||||
private class AsyncPingPongWorker implements Runnable {
|
||||
final BenchmarkServiceGrpc.BenchmarkServiceStub stub;
|
||||
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
|
||||
|
||||
AsyncPingPongWorker(BenchmarkServiceGrpc.BenchmarkServiceStub stub) {
|
||||
this.stub = stub;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!shutdown) {
|
||||
maxOutstanding.acquireUninterruptibly();
|
||||
final AtomicReference<StreamObserver<Messages.SimpleRequest>> requestObserver =
|
||||
new AtomicReference<StreamObserver<Messages.SimpleRequest>>();
|
||||
requestObserver.set(stub.streamingCall(
|
||||
new StreamObserver<Messages.SimpleResponse>() {
|
||||
long now = System.nanoTime();
|
||||
|
||||
@Override
|
||||
public void onNext(Messages.SimpleResponse value) {
|
||||
delay(System.nanoTime() - now);
|
||||
requestObserver.get().onNext(simpleRequest);
|
||||
now = System.nanoTime();
|
||||
if (shutdown) {
|
||||
requestObserver.get().onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
maxOutstanding.release();
|
||||
log.log(Level.INFO, "Error in Async Ping-Pong call", t);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
maxOutstanding.release();
|
||||
}
|
||||
}));
|
||||
requestObserver.get().onNext(simpleRequest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes generic blocking unary calls. Event timing is the duration between
|
||||
* sending the request and receiving the response.
|
||||
*/
|
||||
private class GenericBlockingUnaryWorker implements Runnable {
|
||||
final Channel channel;
|
||||
|
||||
GenericBlockingUnaryWorker(Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
long now;
|
||||
while (!shutdown) {
|
||||
now = System.nanoTime();
|
||||
ClientCalls.blockingUnaryCall(channel, LoadServer.GENERIC_UNARY_METHOD,
|
||||
CallOptions.DEFAULT,
|
||||
genericRequest.slice());
|
||||
delay(System.nanoTime() - now);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes generic async unary calls. Event timing is the duration between
|
||||
* sending the request and receiving the response.
|
||||
*/
|
||||
private class GenericAsyncUnaryWorker implements Runnable {
|
||||
final Channel channel;
|
||||
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
|
||||
|
||||
GenericAsyncUnaryWorker(Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (!shutdown) {
|
||||
maxOutstanding.acquireUninterruptibly();
|
||||
ClientCalls.asyncUnaryCall(
|
||||
channel.newCall(LoadServer.GENERIC_UNARY_METHOD, CallOptions.DEFAULT),
|
||||
genericRequest.slice(),
|
||||
new StreamObserver<ByteBuf>() {
|
||||
long now = System.nanoTime();
|
||||
@Override
|
||||
public void onNext(ByteBuf value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
maxOutstanding.release();
|
||||
log.log(Level.INFO, "Error in Generic Async Unary call", t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
delay(System.nanoTime() - now);
|
||||
maxOutstanding.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Worker which executes a streaming ping-pong call. Event timing is the duration between
|
||||
* sending the ping and receiving the pong.
|
||||
*/
|
||||
private class GenericAsyncPingPongWorker implements Runnable {
|
||||
final Semaphore maxOutstanding = new Semaphore(config.getOutstandingRpcsPerChannel());
|
||||
final Channel channel;
|
||||
|
||||
GenericAsyncPingPongWorker(Channel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!shutdown) {
|
||||
maxOutstanding.acquireUninterruptibly();
|
||||
final ClientCall<ByteBuf, ByteBuf> call =
|
||||
channel.newCall(LoadServer.GENERIC_STREAMING_PING_PONG_METHOD, CallOptions.DEFAULT);
|
||||
call.start(new ClientCall.Listener<ByteBuf>() {
|
||||
long now = System.nanoTime();
|
||||
|
||||
@Override
|
||||
public void onMessage(ByteBuf message) {
|
||||
delay(System.nanoTime() - now);
|
||||
call.request(1);
|
||||
call.sendMessage(genericRequest.slice());
|
||||
now = System.nanoTime();
|
||||
if (shutdown) {
|
||||
call.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(Status status, Metadata trailers) {
|
||||
maxOutstanding.release();
|
||||
if (!status.isOk() && status.getCode() != Status.Code.CANCELLED) {
|
||||
log.log(Level.INFO, "Error in Generic Async Ping-Pong call", status.getCause());
|
||||
}
|
||||
}
|
||||
}, new Metadata());
|
||||
call.request(1);
|
||||
call.sendMessage(genericRequest.slice());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,262 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.driver;
|
||||
|
||||
import com.sun.management.OperatingSystemMXBean;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerServiceDefinition;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.ByteBufOutputMarshaller;
|
||||
import io.grpc.benchmarks.Utils;
|
||||
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
|
||||
import io.grpc.benchmarks.proto.Control;
|
||||
import io.grpc.benchmarks.proto.Messages;
|
||||
import io.grpc.benchmarks.proto.Stats;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.TestUtils;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* Implements the server-side contract for the load testing scenarios.
|
||||
*/
|
||||
final class LoadServer {
|
||||
|
||||
/**
|
||||
* Generic version of the unary method call.
|
||||
*/
|
||||
static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_UNARY_METHOD =
|
||||
MethodDescriptor.create(
|
||||
BenchmarkServiceGrpc.METHOD_UNARY_CALL.getType(),
|
||||
BenchmarkServiceGrpc.METHOD_UNARY_CALL.getFullMethodName(),
|
||||
new ByteBufOutputMarshaller(),
|
||||
new ByteBufOutputMarshaller());
|
||||
|
||||
/**
|
||||
* Generic version of the streaming ping-pong method call.
|
||||
*/
|
||||
static final MethodDescriptor<ByteBuf, ByteBuf> GENERIC_STREAMING_PING_PONG_METHOD =
|
||||
MethodDescriptor.create(
|
||||
BenchmarkServiceGrpc.METHOD_STREAMING_CALL.getType(),
|
||||
BenchmarkServiceGrpc.METHOD_STREAMING_CALL.getFullMethodName(),
|
||||
new ByteBufOutputMarshaller(),
|
||||
new ByteBufOutputMarshaller());
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(LoadServer.class.getName());
|
||||
|
||||
private final Server server;
|
||||
private final BenchmarkServiceImpl benchmarkService;
|
||||
private final OperatingSystemMXBean osBean;
|
||||
private volatile boolean shutdown;
|
||||
private final int port;
|
||||
private ByteBuf genericResponse;
|
||||
private long lastStatTime;
|
||||
private long lastMarkCpuTime;
|
||||
|
||||
LoadServer(Control.ServerConfig config) throws Exception {
|
||||
LOG.log(Level.INFO, "Server Config \n" + config.toString());
|
||||
port = config.getPort() == 0 ? TestUtils.pickUnusedPort() : config.getPort();
|
||||
ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port);
|
||||
int asyncThreads = config.getAsyncServerThreads() == 0
|
||||
? Runtime.getRuntime().availableProcessors()
|
||||
: config.getAsyncServerThreads();
|
||||
// The concepts of sync & async server are quite different in the C impl and the names
|
||||
// chosen for the enum are based on that implementation. We use 'sync' to mean
|
||||
// the direct executor case in Java even though the service implementations are always
|
||||
// fully async.
|
||||
switch (config.getServerType()) {
|
||||
case ASYNC_SERVER: {
|
||||
serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads,
|
||||
new DefaultThreadFactory("server-worker", true)));
|
||||
break;
|
||||
}
|
||||
case SYNC_SERVER: {
|
||||
serverBuilder.directExecutor();
|
||||
break;
|
||||
}
|
||||
case ASYNC_GENERIC_SERVER: {
|
||||
serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads,
|
||||
new DefaultThreadFactory("server-worker", true)));
|
||||
// Create buffers for the generic service
|
||||
PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
|
||||
genericResponse = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize());
|
||||
if (genericResponse.capacity() > 0) {
|
||||
genericResponse.writerIndex(genericResponse.capacity() - 1);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
if (config.hasSecurityParams()) {
|
||||
File cert = TestUtils.loadCert("server1.pem");
|
||||
File key = TestUtils.loadCert("server1.key");
|
||||
serverBuilder.useTransportSecurity(cert, key);
|
||||
}
|
||||
benchmarkService = new BenchmarkServiceImpl();
|
||||
if (config.getServerType() == Control.ServerType.ASYNC_GENERIC_SERVER) {
|
||||
serverBuilder.addService(
|
||||
ServerServiceDefinition
|
||||
.builder(BenchmarkServiceGrpc.SERVICE_NAME)
|
||||
.addMethod(GENERIC_STREAMING_PING_PONG_METHOD, new GenericServiceCallHandler())
|
||||
.build());
|
||||
} else {
|
||||
serverBuilder.addService(BenchmarkServiceGrpc.bindService(benchmarkService));
|
||||
}
|
||||
server = serverBuilder.build();
|
||||
|
||||
List<OperatingSystemMXBean> beans =
|
||||
ManagementFactory.getPlatformMXBeans(OperatingSystemMXBean.class);
|
||||
if (!beans.isEmpty()) {
|
||||
osBean = beans.get(0);
|
||||
} else {
|
||||
osBean = null;
|
||||
}
|
||||
}
|
||||
|
||||
int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
int getCores() {
|
||||
return Runtime.getRuntime().availableProcessors();
|
||||
}
|
||||
|
||||
void start() throws Exception {
|
||||
server.start();
|
||||
lastStatTime = System.nanoTime();
|
||||
if (osBean != null) {
|
||||
lastMarkCpuTime = osBean.getProcessCpuTime();
|
||||
}
|
||||
}
|
||||
|
||||
Stats.ServerStats getStats() {
|
||||
Stats.ServerStats.Builder builder = Stats.ServerStats.newBuilder();
|
||||
long now = System.nanoTime();
|
||||
double elapsed = ((double) now - lastStatTime) / 1000000000.0;
|
||||
lastStatTime = now;
|
||||
builder.setTimeElapsed(elapsed);
|
||||
if (osBean != null) {
|
||||
// Report all the CPU time as user-time (which is intentionally incorrect)
|
||||
long nowCpu = osBean.getProcessCpuTime();
|
||||
builder.setTimeUser(((double) nowCpu - lastMarkCpuTime) / 1000000000.0);
|
||||
lastMarkCpuTime = nowCpu;
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
void shutdownNow() {
|
||||
shutdown = true;
|
||||
server.shutdownNow();
|
||||
}
|
||||
|
||||
private class BenchmarkServiceImpl implements BenchmarkServiceGrpc.BenchmarkService {
|
||||
|
||||
@Override
|
||||
public void unaryCall(Messages.SimpleRequest request,
|
||||
StreamObserver<Messages.SimpleResponse> responseObserver) {
|
||||
responseObserver.onNext(Utils.makeResponse(request));
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamObserver<Messages.SimpleRequest> streamingCall(
|
||||
final StreamObserver<Messages.SimpleResponse> responseObserver) {
|
||||
return new StreamObserver<Messages.SimpleRequest>() {
|
||||
@Override
|
||||
public void onNext(Messages.SimpleRequest value) {
|
||||
if (!shutdown) {
|
||||
responseObserver.onNext(Utils.makeResponse(value));
|
||||
} else {
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
responseObserver.onError(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private class GenericServiceCallHandler implements ServerCallHandler<ByteBuf, ByteBuf> {
|
||||
@Override
|
||||
public ServerCall.Listener<ByteBuf> startCall(MethodDescriptor<ByteBuf, ByteBuf> method,
|
||||
final ServerCall<ByteBuf> call, Metadata headers) {
|
||||
call.sendHeaders(new Metadata());
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ByteBuf>() {
|
||||
@Override
|
||||
public void onMessage(ByteBuf message) {
|
||||
// no-op
|
||||
message.release();
|
||||
call.request(1);
|
||||
call.sendMessage(genericResponse.slice());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHalfClose() {
|
||||
call.close(Status.OK, new Metadata());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCancel() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,262 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.driver;
|
||||
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.proto.Control;
|
||||
import io.grpc.benchmarks.proto.WorkerServiceGrpc;
|
||||
import io.grpc.internal.ServerImpl;
|
||||
import io.grpc.netty.NettyServerBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* A load worker process which a driver can use to create clients and servers. The worker
|
||||
* implements the contract defined in 'control.proto'.
|
||||
*/
|
||||
public class LoadWorker {
|
||||
|
||||
private static final Logger log = Logger.getLogger(LoadWorker.class.getName());
|
||||
|
||||
private final int serverPort;
|
||||
private final ServerImpl driverServer;
|
||||
|
||||
LoadWorker(int driverPort, int serverPort) throws Exception {
|
||||
this.serverPort = serverPort;
|
||||
NioEventLoopGroup singleThreadGroup = new NioEventLoopGroup(1);
|
||||
this.driverServer = NettyServerBuilder.forPort(driverPort)
|
||||
.directExecutor()
|
||||
.workerEventLoopGroup(singleThreadGroup)
|
||||
.bossEventLoopGroup(singleThreadGroup)
|
||||
.addService(WorkerServiceGrpc.bindService(new WorkerServiceImpl()))
|
||||
.build();
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
driverServer.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the load worker process.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
boolean usage = false;
|
||||
int serverPort = 0;
|
||||
int driverPort = 0;
|
||||
for (String arg : args) {
|
||||
if (!arg.startsWith("--")) {
|
||||
System.err.println("All arguments must start with '--': " + arg);
|
||||
usage = true;
|
||||
break;
|
||||
}
|
||||
String[] parts = arg.substring(2).split("=", 2);
|
||||
String key = parts[0];
|
||||
if ("help".equals(key)) {
|
||||
usage = true;
|
||||
break;
|
||||
}
|
||||
if (parts.length != 2) {
|
||||
System.err.println("All arguments must be of the form --arg=value");
|
||||
usage = true;
|
||||
break;
|
||||
}
|
||||
String value = parts[1];
|
||||
if ("server_port".equals(key)) {
|
||||
serverPort = Integer.valueOf(value);
|
||||
} else if ("driver_port".equals(key)) {
|
||||
driverPort = Integer.valueOf(value);
|
||||
} else {
|
||||
System.err.println("Unknown argument: " + key);
|
||||
usage = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (usage || driverPort == 0) {
|
||||
System.err.println(
|
||||
"Usage: [ARGS...]"
|
||||
+ "\n"
|
||||
+ "\n --driver_port=<port>"
|
||||
+ "\n Port to expose grpc.testing.WorkerService, used by driver to initiate work."
|
||||
+ "\n --server_port=<port>"
|
||||
+ "\n Port to start load servers on. Defaults to any available port");
|
||||
System.exit(1);
|
||||
}
|
||||
LoadWorker loadWorker = new LoadWorker(driverPort, serverPort);
|
||||
loadWorker.start();
|
||||
loadWorker.driverServer.awaitTermination();
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement the worker service contract which can launch clients and servers.
|
||||
*/
|
||||
private class WorkerServiceImpl implements WorkerServiceGrpc.WorkerService {
|
||||
|
||||
private LoadServer workerServer;
|
||||
private LoadClient workerClient;
|
||||
|
||||
@Override
|
||||
public StreamObserver<Control.ServerArgs> runServer(
|
||||
final StreamObserver<Control.ServerStatus> responseObserver) {
|
||||
return new StreamObserver<Control.ServerArgs>() {
|
||||
@Override
|
||||
public void onNext(Control.ServerArgs value) {
|
||||
try {
|
||||
if (value.getSetup() != null && workerServer == null) {
|
||||
if (serverPort != 0 && value.getSetup().getPort() == 0) {
|
||||
Control.ServerArgs.Builder builder = value.toBuilder();
|
||||
builder.getSetupBuilder().setPort(serverPort);
|
||||
value = builder.build();
|
||||
}
|
||||
workerServer = new LoadServer(value.getSetup());
|
||||
workerServer.start();
|
||||
responseObserver.onNext(Control.ServerStatus.newBuilder()
|
||||
.setPort(workerServer.getPort())
|
||||
.setCores(workerServer.getCores())
|
||||
.build());
|
||||
} else if (value.getMark() != null && workerServer != null) {
|
||||
responseObserver.onNext(Control.ServerStatus.newBuilder()
|
||||
.setStats(workerServer.getStats())
|
||||
.build());
|
||||
} else {
|
||||
responseObserver.onError(Status.ALREADY_EXISTS
|
||||
.withDescription("Server already started")
|
||||
.asRuntimeException());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.log(Level.WARNING, "Error running server", t);
|
||||
responseObserver.onError(Status.INTERNAL.withCause(t).asException());
|
||||
// Shutdown server if we can
|
||||
onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
Status status = Status.fromThrowable(t);
|
||||
if (status.getCode() != Status.Code.CANCELLED) {
|
||||
log.log(Level.WARNING, "Error driving server", t);
|
||||
}
|
||||
onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
try {
|
||||
if (workerServer != null) {
|
||||
workerServer.shutdownNow();
|
||||
}
|
||||
} finally {
|
||||
workerServer = null;
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamObserver<Control.ClientArgs> runClient(
|
||||
final StreamObserver<Control.ClientStatus> responseObserver) {
|
||||
return new StreamObserver<Control.ClientArgs>() {
|
||||
@Override
|
||||
public void onNext(Control.ClientArgs value) {
|
||||
try {
|
||||
if (value.getSetup() != null && workerClient == null) {
|
||||
workerClient = new LoadClient(value.getSetup());
|
||||
workerClient.start();
|
||||
responseObserver.onNext(Control.ClientStatus.newBuilder().build());
|
||||
} else if (value.getMark() != null && workerClient != null) {
|
||||
responseObserver.onNext(Control.ClientStatus.newBuilder()
|
||||
.setStats(workerClient.getStats())
|
||||
.build());
|
||||
} else {
|
||||
responseObserver.onError(Status.ALREADY_EXISTS
|
||||
.withDescription("Client already started")
|
||||
.asRuntimeException());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.log(Level.WARNING, "Error running client", t);
|
||||
responseObserver.onError(Status.INTERNAL.withCause(t).asException());
|
||||
// Shutdown the client if we can
|
||||
onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
Status status = Status.fromThrowable(t);
|
||||
if (status.getCode() != Status.Code.CANCELLED) {
|
||||
log.log(Level.WARNING, "Error driving client", t);
|
||||
}
|
||||
onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
try {
|
||||
if (workerClient != null) {
|
||||
workerClient.shutdownNow();
|
||||
}
|
||||
} finally {
|
||||
workerClient = null;
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void coreCount(Control.CoreRequest request,
|
||||
StreamObserver<Control.CoreResponse> responseObserver) {
|
||||
responseObserver.onNext(
|
||||
Control.CoreResponse.newBuilder()
|
||||
.setCores(Runtime.getRuntime().availableProcessors())
|
||||
.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void quitWorker(Control.Void request,
|
||||
StreamObserver<Control.Void> responseObserver) {
|
||||
try {
|
||||
responseObserver.onNext(Control.Void.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
driverServer.shutdownNow();
|
||||
} catch (Throwable t) {
|
||||
log.log(Level.WARNING, "Error during shutdown", t);
|
||||
}
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -31,6 +31,9 @@
|
|||
|
||||
package io.grpc.benchmarks.qps;
|
||||
|
||||
import static io.grpc.benchmarks.Utils.HISTOGRAM_MAX_VALUE;
|
||||
import static io.grpc.benchmarks.Utils.HISTOGRAM_PRECISION;
|
||||
import static io.grpc.benchmarks.Utils.saveHistogram;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CHANNELS;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
|
||||
|
|
@ -46,10 +49,6 @@ import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
|
|||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.USE_DEFAULT_CIPHERS;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION;
|
||||
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_MAX_VALUE;
|
||||
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_PRECISION;
|
||||
import static io.grpc.benchmarks.qps.Utils.newClientChannel;
|
||||
import static io.grpc.benchmarks.qps.Utils.saveHistogram;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
|
@ -98,7 +97,7 @@ public class AsyncClient {
|
|||
|
||||
List<ManagedChannel> channels = new ArrayList<ManagedChannel>(config.channels);
|
||||
for (int i = 0; i < config.channels; i++) {
|
||||
channels.add(newClientChannel(config));
|
||||
channels.add(config.newChannel());
|
||||
}
|
||||
|
||||
// Do a warmup first. It's the same as the actual benchmark, except that
|
||||
|
|
|
|||
|
|
@ -31,13 +31,9 @@
|
|||
|
||||
package io.grpc.benchmarks.qps;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import io.grpc.Server;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.benchmarks.Utils;
|
||||
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
|
||||
import io.grpc.benchmarks.proto.Messages.Payload;
|
||||
import io.grpc.benchmarks.proto.Messages.PayloadType;
|
||||
import io.grpc.benchmarks.proto.Messages.SimpleRequest;
|
||||
import io.grpc.benchmarks.proto.Messages.SimpleResponse;
|
||||
import io.grpc.netty.GrpcSslContexts;
|
||||
|
|
@ -185,7 +181,7 @@ public class AsyncServer {
|
|||
|
||||
@Override
|
||||
public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
|
||||
SimpleResponse response = buildSimpleResponse(request);
|
||||
SimpleResponse response = Utils.makeResponse(request);
|
||||
responseObserver.onNext(response);
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
|
@ -196,7 +192,7 @@ public class AsyncServer {
|
|||
return new StreamObserver<SimpleRequest>() {
|
||||
@Override
|
||||
public void onNext(SimpleRequest request) {
|
||||
SimpleResponse response = buildSimpleResponse(request);
|
||||
SimpleResponse response = Utils.makeResponse(request);
|
||||
responseObserver.onNext(response);
|
||||
}
|
||||
|
||||
|
|
@ -213,19 +209,5 @@ public class AsyncServer {
|
|||
};
|
||||
}
|
||||
|
||||
private static SimpleResponse buildSimpleResponse(SimpleRequest request) {
|
||||
if (request.getResponseSize() > 0) {
|
||||
if (!PayloadType.COMPRESSABLE.equals(request.getResponseType())) {
|
||||
throw Status.INTERNAL.augmentDescription("Error creating payload.").asRuntimeException();
|
||||
}
|
||||
|
||||
ByteString body = ByteString.copyFrom(new byte[request.getResponseSize()]);
|
||||
PayloadType type = request.getResponseType();
|
||||
|
||||
Payload payload = Payload.newBuilder().setType(type).setBody(body).build();
|
||||
return SimpleResponse.newBuilder().setPayload(payload).build();
|
||||
}
|
||||
return SimpleResponse.getDefaultInstance();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,17 +31,19 @@
|
|||
|
||||
package io.grpc.benchmarks.qps;
|
||||
|
||||
import static io.grpc.benchmarks.qps.SocketAddressValidator.INET;
|
||||
import static io.grpc.benchmarks.qps.SocketAddressValidator.UDS;
|
||||
import static io.grpc.benchmarks.qps.Utils.parseBoolean;
|
||||
import static io.grpc.benchmarks.Utils.parseBoolean;
|
||||
import static java.lang.Integer.parseInt;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.benchmarks.Transport;
|
||||
import io.grpc.benchmarks.Utils;
|
||||
import io.grpc.benchmarks.proto.Control.RpcType;
|
||||
import io.grpc.benchmarks.proto.Messages;
|
||||
import io.grpc.benchmarks.proto.Messages.PayloadType;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.testing.TestUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Collection;
|
||||
|
|
@ -52,12 +54,13 @@ import java.util.Set;
|
|||
/**
|
||||
* Configuration options for benchmark clients.
|
||||
*/
|
||||
class ClientConfiguration implements Configuration {
|
||||
public class ClientConfiguration implements Configuration {
|
||||
private static final ClientConfiguration DEFAULT = new ClientConfiguration();
|
||||
|
||||
Transport transport = Transport.NETTY_NIO;
|
||||
boolean tls;
|
||||
boolean testca;
|
||||
String authorityOverride = TestUtils.TEST_SERVER_HOST;
|
||||
boolean useDefaultCiphers;
|
||||
boolean directExecutor;
|
||||
SocketAddress address;
|
||||
|
|
@ -65,7 +68,7 @@ class ClientConfiguration implements Configuration {
|
|||
int outstandingRpcsPerChannel = 10;
|
||||
int serverPayload;
|
||||
int clientPayload;
|
||||
int flowControlWindow = NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW;
|
||||
int flowControlWindow = Utils.DEFAULT_FLOW_CONTROL_WINDOW;
|
||||
// seconds
|
||||
int duration = 60;
|
||||
// seconds
|
||||
|
|
@ -78,15 +81,24 @@ class ClientConfiguration implements Configuration {
|
|||
private ClientConfiguration() {
|
||||
}
|
||||
|
||||
public ManagedChannel newChannel() throws IOException {
|
||||
return Utils.newClientChannel(Transport.NETTY_EPOLL, address, tls, testca, authorityOverride,
|
||||
useDefaultCiphers, flowControlWindow, directExecutor);
|
||||
}
|
||||
|
||||
public Messages.SimpleRequest newRequest() {
|
||||
return Utils.makeRequest(payloadType, clientPayload, serverPayload);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a builder for configuring a client application with supported parameters. If no
|
||||
* parameters are provided, all parameters are assumed to be supported.
|
||||
*/
|
||||
static Builder newBuilder(ClientParam... supportedParams) {
|
||||
public static Builder newBuilder(ClientParam... supportedParams) {
|
||||
return new Builder(supportedParams);
|
||||
}
|
||||
|
||||
static class Builder extends AbstractConfigurationBuilder<ClientConfiguration> {
|
||||
public static class Builder extends AbstractConfigurationBuilder<ClientConfiguration> {
|
||||
private final Collection<Param> supportedParams;
|
||||
|
||||
private Builder(ClientParam... supportedParams) {
|
||||
|
|
@ -135,58 +147,6 @@ class ClientConfiguration implements Configuration {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All of the supported transports.
|
||||
*/
|
||||
enum Transport {
|
||||
NETTY_NIO(true, "The Netty Java NIO transport. Using this with TLS requires "
|
||||
+ "that the Java bootclasspath be configured with Jetty ALPN boot.", INET),
|
||||
NETTY_EPOLL(true, "The Netty native EPOLL transport. Using this with TLS requires that "
|
||||
+ "OpenSSL be installed and configured as described in "
|
||||
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.", INET),
|
||||
NETTY_UNIX_DOMAIN_SOCKET(false, "The Netty Unix Domain Socket transport. This currently "
|
||||
+ "does not support TLS.", UDS),
|
||||
OK_HTTP(true, "The OkHttp transport.", INET);
|
||||
|
||||
final boolean tlsSupported;
|
||||
final String description;
|
||||
final SocketAddressValidator socketAddressValidator;
|
||||
|
||||
Transport(boolean tlsSupported, String description,
|
||||
SocketAddressValidator socketAddressValidator) {
|
||||
this.tlsSupported = tlsSupported;
|
||||
this.description = description;
|
||||
this.socketAddressValidator = socketAddressValidator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the given address for this transport.
|
||||
*
|
||||
* @throws IllegalArgumentException if the given address is invalid for this transport.
|
||||
*/
|
||||
void validateSocketAddress(SocketAddress address) {
|
||||
if (!socketAddressValidator.isValidSocketAddress(address)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid address " + address + " for transport " + this);
|
||||
}
|
||||
}
|
||||
|
||||
static String getDescriptionString() {
|
||||
StringBuilder builder = new StringBuilder("Select the transport to use. Options:\n");
|
||||
boolean first = true;
|
||||
for (Transport transport : Transport.values()) {
|
||||
if (!first) {
|
||||
builder.append("\n");
|
||||
}
|
||||
builder.append(transport.name().toLowerCase());
|
||||
builder.append(": ");
|
||||
builder.append(transport.description);
|
||||
first = false;
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
enum ClientParam implements AbstractConfigurationBuilder.Param {
|
||||
ADDRESS("STR", "Socket address (host:port) or Unix Domain Socket file name "
|
||||
+ "(unix:///path/to/file), depending on the transport selected.", null, true) {
|
||||
|
|
|
|||
|
|
@ -32,6 +32,9 @@
|
|||
package io.grpc.benchmarks.qps;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static io.grpc.benchmarks.Utils.HISTOGRAM_MAX_VALUE;
|
||||
import static io.grpc.benchmarks.Utils.HISTOGRAM_PRECISION;
|
||||
import static io.grpc.benchmarks.Utils.saveHistogram;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.ADDRESS;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.CLIENT_PAYLOAD;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.DURATION;
|
||||
|
|
@ -43,11 +46,6 @@ import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TESTCA;
|
|||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
|
||||
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.USE_DEFAULT_CIPHERS;
|
||||
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_MAX_VALUE;
|
||||
import static io.grpc.benchmarks.qps.Utils.HISTOGRAM_PRECISION;
|
||||
import static io.grpc.benchmarks.qps.Utils.newClientChannel;
|
||||
import static io.grpc.benchmarks.qps.Utils.newRequest;
|
||||
import static io.grpc.benchmarks.qps.Utils.saveHistogram;
|
||||
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ManagedChannel;
|
||||
|
|
@ -108,8 +106,8 @@ public class OpenLoopClient {
|
|||
}
|
||||
config.channels = 1;
|
||||
config.directExecutor = true;
|
||||
ManagedChannel ch = newClientChannel(config);
|
||||
SimpleRequest req = newRequest(config);
|
||||
ManagedChannel ch = config.newChannel();
|
||||
SimpleRequest req = config.newRequest();
|
||||
LoadGenerationWorker worker =
|
||||
new LoadGenerationWorker(ch, req, config.targetQps, config.duration);
|
||||
final long start = System.nanoTime();
|
||||
|
|
|
|||
|
|
@ -31,11 +31,11 @@
|
|||
|
||||
package io.grpc.benchmarks.qps;
|
||||
|
||||
import static io.grpc.benchmarks.qps.SocketAddressValidator.INET;
|
||||
import static io.grpc.benchmarks.qps.SocketAddressValidator.UDS;
|
||||
import static io.grpc.benchmarks.qps.Utils.parseBoolean;
|
||||
import static io.grpc.benchmarks.Utils.parseBoolean;
|
||||
import static java.lang.Integer.parseInt;
|
||||
|
||||
import io.grpc.benchmarks.SocketAddressValidator;
|
||||
import io.grpc.benchmarks.Utils;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.testing.TestUtils;
|
||||
|
||||
|
|
@ -104,14 +104,17 @@ class ServerConfiguration implements Configuration {
|
|||
/**
|
||||
* All of the supported transports.
|
||||
*/
|
||||
enum Transport {
|
||||
public enum Transport {
|
||||
NETTY_NIO(true, "The Netty Java NIO transport. Using this with TLS requires "
|
||||
+ "that the Java bootclasspath be configured with Jetty ALPN boot.", INET),
|
||||
+ "that the Java bootclasspath be configured with Jetty ALPN boot.",
|
||||
SocketAddressValidator.INET),
|
||||
NETTY_EPOLL(true, "The Netty native EPOLL transport. Using this with TLS requires that "
|
||||
+ "OpenSSL be installed and configured as described in "
|
||||
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.", INET),
|
||||
+ "http://netty.io/wiki/forked-tomcat-native.html. Only supported on Linux.",
|
||||
SocketAddressValidator.INET),
|
||||
NETTY_UNIX_DOMAIN_SOCKET(false, "The Netty Unix Domain Socket transport. This currently "
|
||||
+ "does not support TLS.", UDS);
|
||||
+ "does not support TLS.",
|
||||
SocketAddressValidator.UDS);
|
||||
|
||||
private final boolean tlsSupported;
|
||||
private final String description;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.driver;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
|
||||
import io.grpc.benchmarks.proto.Control;
|
||||
import io.grpc.benchmarks.proto.Stats;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Tests for {@link LoadClient}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadClientTest {
|
||||
|
||||
@Test
|
||||
public void testHistogramToStatsConversion() throws Exception {
|
||||
double resolution = 1.01;
|
||||
double maxPossible = 10000.0;
|
||||
Control.ClientConfig.Builder config = Control.ClientConfig.newBuilder();
|
||||
config.getHistogramParamsBuilder().setMaxPossible(maxPossible)
|
||||
.setResolution(resolution - 1.0);
|
||||
config.getPayloadConfigBuilder().getSimpleParamsBuilder()
|
||||
.setReqSize(1)
|
||||
.setRespSize(1);
|
||||
config.setRpcType(Control.RpcType.UNARY);
|
||||
config.setClientType(Control.ClientType.SYNC_CLIENT);
|
||||
config.setClientChannels(1);
|
||||
config.setOutstandingRpcsPerChannel(1);
|
||||
config.getLoadParamsBuilder().getClosedLoopBuilder();
|
||||
config.addServerTargets("localhost:9999");
|
||||
|
||||
LoadClient loadClient = new LoadClient(config.build());
|
||||
loadClient.delay(1);
|
||||
loadClient.delay(10);
|
||||
loadClient.delay(10);
|
||||
loadClient.delay(100);
|
||||
loadClient.delay(100);
|
||||
loadClient.delay(100);
|
||||
loadClient.delay(1000);
|
||||
loadClient.delay(1000);
|
||||
loadClient.delay(1000);
|
||||
loadClient.delay(1000);
|
||||
|
||||
Stats.ClientStats stats = loadClient.getStats();
|
||||
|
||||
assertEquals(1.0, stats.getLatencies().getMinSeen());
|
||||
assertEquals(1000.0, stats.getLatencies().getMaxSeen());
|
||||
assertEquals(10.0, stats.getLatencies().getCount());
|
||||
|
||||
double base = 0;
|
||||
double logBase = 1;
|
||||
|
||||
for (int i = 0; i < stats.getLatencies().getBucketCount(); i++) {
|
||||
int bucketCount = stats.getLatencies().getBucket(i);
|
||||
if (base > 1.0 && base / resolution < 1.0) {
|
||||
assertEquals(1, bucketCount);
|
||||
} else if (base > 10.0 && base / resolution < 10.0) {
|
||||
assertEquals(2, bucketCount);
|
||||
} else if (base > 100.0 && base / resolution < 100.0) {
|
||||
assertEquals(3, bucketCount);
|
||||
} else if (base > 1000.0 && base / resolution < 1000.0) {
|
||||
assertEquals(4, bucketCount);
|
||||
} else {
|
||||
assertEquals(0, bucketCount);
|
||||
}
|
||||
logBase = logBase * resolution;
|
||||
base = logBase - 1;
|
||||
}
|
||||
assertTrue(base > 10000);
|
||||
assertTrue(base / resolution <= 10000);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,278 @@
|
|||
/*
|
||||
* Copyright 2016, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.benchmarks.driver;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.grpc.benchmarks.proto.Control;
|
||||
import io.grpc.benchmarks.proto.Stats;
|
||||
import io.grpc.benchmarks.proto.WorkerServiceGrpc;
|
||||
import io.grpc.internal.ManagedChannelImpl;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.TestUtils;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Basic tests for {@link io.grpc.benchmarks.driver.LoadWorker}
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class LoadWorkerTest {
|
||||
|
||||
|
||||
private static final int TIMEOUT = 5;
|
||||
private static final Control.ClientArgs MARK = Control.ClientArgs.newBuilder()
|
||||
.setMark(Control.Mark.newBuilder().setReset(true).build())
|
||||
.build();
|
||||
|
||||
private LoadWorker worker;
|
||||
private ManagedChannelImpl channel;
|
||||
private WorkerServiceGrpc.WorkerServiceStub workerServiceStub;
|
||||
private LinkedBlockingQueue<Stats.ClientStats> marksQueue;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
int port = TestUtils.pickUnusedPort();
|
||||
worker = new LoadWorker(port, 0);
|
||||
worker.start();
|
||||
channel = NettyChannelBuilder.forAddress("localhost", port).usePlaintext(true).build();
|
||||
workerServiceStub = WorkerServiceGrpc.newStub(channel);
|
||||
marksQueue = new LinkedBlockingQueue<Stats.ClientStats>();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void runUnaryBlockingClosedLoop() throws Exception {
|
||||
Control.ServerArgs.Builder serverArgsBuilder = Control.ServerArgs.newBuilder();
|
||||
serverArgsBuilder.getSetupBuilder()
|
||||
.setServerType(Control.ServerType.ASYNC_SERVER)
|
||||
.setAsyncServerThreads(4)
|
||||
.setPort(0)
|
||||
.getPayloadConfigBuilder().getSimpleParamsBuilder().setRespSize(1000);
|
||||
int serverPort = startServer(serverArgsBuilder.build());
|
||||
|
||||
Control.ClientArgs.Builder clientArgsBuilder = Control.ClientArgs.newBuilder();
|
||||
String serverAddress = "localhost:" + serverPort;
|
||||
clientArgsBuilder.getSetupBuilder()
|
||||
.setClientType(Control.ClientType.SYNC_CLIENT)
|
||||
.setRpcType(Control.RpcType.UNARY)
|
||||
.setClientChannels(2)
|
||||
.setOutstandingRpcsPerChannel(2)
|
||||
.addServerTargets(serverAddress);
|
||||
clientArgsBuilder.getSetupBuilder().getPayloadConfigBuilder().getSimpleParamsBuilder()
|
||||
.setReqSize(1000)
|
||||
.setRespSize(1000);
|
||||
clientArgsBuilder.getSetupBuilder().getHistogramParamsBuilder()
|
||||
.setResolution(0.01)
|
||||
.setMaxPossible(60000000000.0);
|
||||
StreamObserver<Control.ClientArgs> clientObserver = startClient(clientArgsBuilder.build());
|
||||
assertWorkOccurred(clientObserver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void runUnaryAsyncClosedLoop() throws Exception {
|
||||
Control.ServerArgs.Builder serverArgsBuilder = Control.ServerArgs.newBuilder();
|
||||
serverArgsBuilder.getSetupBuilder()
|
||||
.setServerType(Control.ServerType.ASYNC_SERVER)
|
||||
.setAsyncServerThreads(4)
|
||||
.setPort(0)
|
||||
.getPayloadConfigBuilder().getSimpleParamsBuilder().setRespSize(1000);
|
||||
int serverPort = startServer(serverArgsBuilder.build());
|
||||
|
||||
Control.ClientArgs.Builder clientArgsBuilder = Control.ClientArgs.newBuilder();
|
||||
String serverAddress = "localhost:" + serverPort;
|
||||
clientArgsBuilder.getSetupBuilder()
|
||||
.setClientType(Control.ClientType.ASYNC_CLIENT)
|
||||
.setClientChannels(2)
|
||||
.setRpcType(Control.RpcType.UNARY)
|
||||
.setOutstandingRpcsPerChannel(1)
|
||||
.setAsyncClientThreads(4)
|
||||
.addServerTargets(serverAddress);
|
||||
clientArgsBuilder.getSetupBuilder().getPayloadConfigBuilder().getSimpleParamsBuilder()
|
||||
.setReqSize(1000)
|
||||
.setRespSize(1000);
|
||||
clientArgsBuilder.getSetupBuilder().getHistogramParamsBuilder()
|
||||
.setResolution(0.01)
|
||||
.setMaxPossible(60000000000.0);
|
||||
StreamObserver<Control.ClientArgs> clientObserver = startClient(clientArgsBuilder.build());
|
||||
assertWorkOccurred(clientObserver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void runPingPongAsyncClosedLoop() throws Exception {
|
||||
Control.ServerArgs.Builder serverArgsBuilder = Control.ServerArgs.newBuilder();
|
||||
serverArgsBuilder.getSetupBuilder()
|
||||
.setServerType(Control.ServerType.ASYNC_SERVER)
|
||||
.setAsyncServerThreads(4)
|
||||
.setPort(0)
|
||||
.getPayloadConfigBuilder().getSimpleParamsBuilder().setRespSize(1000);
|
||||
int serverPort = startServer(serverArgsBuilder.build());
|
||||
|
||||
Control.ClientArgs.Builder clientArgsBuilder = Control.ClientArgs.newBuilder();
|
||||
String serverAddress = "localhost:" + serverPort;
|
||||
clientArgsBuilder.getSetupBuilder()
|
||||
.setClientType(Control.ClientType.ASYNC_CLIENT)
|
||||
.setClientChannels(2)
|
||||
.setRpcType(Control.RpcType.STREAMING)
|
||||
.setOutstandingRpcsPerChannel(1)
|
||||
.setAsyncClientThreads(4)
|
||||
.addServerTargets(serverAddress);
|
||||
clientArgsBuilder.getSetupBuilder().getPayloadConfigBuilder().getSimpleParamsBuilder()
|
||||
.setReqSize(1000)
|
||||
.setRespSize(1000);
|
||||
clientArgsBuilder.getSetupBuilder().getHistogramParamsBuilder()
|
||||
.setResolution(0.01)
|
||||
.setMaxPossible(60000000000.0);
|
||||
StreamObserver<Control.ClientArgs> clientObserver = startClient(clientArgsBuilder.build());
|
||||
assertWorkOccurred(clientObserver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void runGenericPingPongAsyncClosedLoop() throws Exception {
|
||||
Control.ServerArgs.Builder serverArgsBuilder = Control.ServerArgs.newBuilder();
|
||||
serverArgsBuilder.getSetupBuilder()
|
||||
.setServerType(Control.ServerType.ASYNC_GENERIC_SERVER)
|
||||
.setAsyncServerThreads(4)
|
||||
.setPort(0)
|
||||
.getPayloadConfigBuilder().getBytebufParamsBuilder().setReqSize(1000).setRespSize(1000);
|
||||
int serverPort = startServer(serverArgsBuilder.build());
|
||||
|
||||
Control.ClientArgs.Builder clientArgsBuilder = Control.ClientArgs.newBuilder();
|
||||
String serverAddress = "localhost:" + serverPort;
|
||||
clientArgsBuilder.getSetupBuilder()
|
||||
.setClientType(Control.ClientType.ASYNC_CLIENT)
|
||||
.setClientChannels(2)
|
||||
.setRpcType(Control.RpcType.STREAMING)
|
||||
.setOutstandingRpcsPerChannel(1)
|
||||
.setAsyncClientThreads(4)
|
||||
.addServerTargets(serverAddress);
|
||||
clientArgsBuilder.getSetupBuilder().getPayloadConfigBuilder().getBytebufParamsBuilder()
|
||||
.setReqSize(1000)
|
||||
.setRespSize(1000);
|
||||
clientArgsBuilder.getSetupBuilder().getHistogramParamsBuilder()
|
||||
.setResolution(0.01)
|
||||
.setMaxPossible(60000000000.0);
|
||||
StreamObserver<Control.ClientArgs> clientObserver = startClient(clientArgsBuilder.build());
|
||||
assertWorkOccurred(clientObserver);
|
||||
}
|
||||
|
||||
private void assertWorkOccurred(StreamObserver<Control.ClientArgs> clientObserver)
|
||||
throws InterruptedException {
|
||||
|
||||
Stats.ClientStats stat = null;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
// Poll until we get some stats
|
||||
Thread.sleep(300);
|
||||
clientObserver.onNext(MARK);
|
||||
stat = marksQueue.poll(TIMEOUT, TimeUnit.SECONDS);
|
||||
if (stat == null) {
|
||||
fail("Did not receive stats");
|
||||
}
|
||||
if (stat.getLatencies().getCount() > 10) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clientObserver.onCompleted();
|
||||
assertTrue(stat.hasLatencies());
|
||||
assertTrue(stat.getLatencies().getCount() < stat.getLatencies().getSum());
|
||||
double mean = stat.getLatencies().getSum() / stat.getLatencies().getCount();
|
||||
System.out.println("Mean " + mean + " us");
|
||||
assertTrue(mean > stat.getLatencies().getMinSeen());
|
||||
assertTrue(mean < stat.getLatencies().getMaxSeen());
|
||||
}
|
||||
|
||||
private StreamObserver<Control.ClientArgs> startClient(Control.ClientArgs clientArgs)
|
||||
throws InterruptedException {
|
||||
final CountDownLatch clientReady = new CountDownLatch(1);
|
||||
StreamObserver<Control.ClientArgs> clientObserver = workerServiceStub.runClient(
|
||||
new StreamObserver<Control.ClientStatus>() {
|
||||
@Override
|
||||
public void onNext(Control.ClientStatus value) {
|
||||
clientReady.countDown();
|
||||
if (value.hasStats()) {
|
||||
marksQueue.add(value.getStats());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
});
|
||||
|
||||
// Start the client
|
||||
clientObserver.onNext(clientArgs);
|
||||
if (!clientReady.await(TIMEOUT, TimeUnit.SECONDS)) {
|
||||
fail("Client failed to start");
|
||||
}
|
||||
return clientObserver;
|
||||
}
|
||||
|
||||
private int startServer(Control.ServerArgs serverArgs) throws InterruptedException {
|
||||
final AtomicInteger serverPort = new AtomicInteger();
|
||||
final CountDownLatch serverReady = new CountDownLatch(1);
|
||||
StreamObserver<Control.ServerArgs> serverObserver =
|
||||
workerServiceStub.runServer(new StreamObserver<Control.ServerStatus>() {
|
||||
@Override
|
||||
public void onNext(Control.ServerStatus value) {
|
||||
serverPort.set(value.getPort());
|
||||
serverReady.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
});
|
||||
// trigger server startup
|
||||
serverObserver.onNext(serverArgs);
|
||||
if (!serverReady.await(TIMEOUT, TimeUnit.SECONDS)) {
|
||||
fail("Server failed to start");
|
||||
}
|
||||
return serverPort.get();
|
||||
}
|
||||
}
|
||||
|
|
@ -125,8 +125,6 @@ subprojects {
|
|||
}
|
||||
libraries = [
|
||||
guava: 'com.google.guava:guava:19.0',
|
||||
// used to collect benchmark results
|
||||
hdrhistogram: 'org.hdrhistogram:HdrHistogram:2.1.4',
|
||||
hpack: 'com.twitter:hpack:0.10.1',
|
||||
jsr305: 'com.google.code.findbugs:jsr305:3.0.0',
|
||||
oauth_client: 'com.google.auth:google-auth-library-oauth2-http:0.3.0',
|
||||
|
|
@ -144,7 +142,11 @@ subprojects {
|
|||
// Test dependencies.
|
||||
junit: 'junit:junit:4.11',
|
||||
mockito: 'org.mockito:mockito-core:1.9.5',
|
||||
truth: 'com.google.truth:truth:0.28'
|
||||
truth: 'com.google.truth:truth:0.28',
|
||||
|
||||
// Benchmark dependencies
|
||||
hdrhistogram: 'org.hdrhistogram:HdrHistogram:2.1.8',
|
||||
math: 'org.apache.commons:commons-math3:3.6',
|
||||
]
|
||||
|
||||
// Determine the correct version of Jetty ALPN boot to use based
|
||||
|
|
|
|||
Loading…
Reference in New Issue