Add benchmarks for:

- streaming ping-pong messages per second
- unary response bandwidth in megabits per second
- streaming response bandwidth in megabits per second
- streaming server that obeys outbound flow control
This commit is contained in:
Louis Ryan 2015-05-18 13:06:45 -07:00
parent 7c1dabab5b
commit eee86b4fbb
7 changed files with 694 additions and 25 deletions

View File

@ -2,6 +2,7 @@ package io.grpc.benchmarks.netty;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Call;
import io.grpc.ChannelImpl; import io.grpc.ChannelImpl;
import io.grpc.DeferredInputStream; import io.grpc.DeferredInputStream;
import io.grpc.Marshaller; import io.grpc.Marshaller;
@ -35,6 +36,7 @@ import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -63,7 +65,7 @@ public abstract class AbstractBenchmark {
* Standard flow-control window sizes. * Standard flow-control window sizes.
*/ */
public enum FlowWindowSize { public enum FlowWindowSize {
SMALL(16384), MEDIUM(65536), LARGE(1048576), JUMBO(16777216); SMALL(16383), MEDIUM(65535), LARGE(1048575), JUMBO(16777215);
private final int bytes; private final int bytes;
FlowWindowSize(int bytes) { FlowWindowSize(int bytes) {
@ -94,6 +96,8 @@ public abstract class AbstractBenchmark {
protected ByteBuf request; protected ByteBuf request;
protected ByteBuf response; protected ByteBuf response;
protected MethodDescriptor<ByteBuf, ByteBuf> unaryMethod; protected MethodDescriptor<ByteBuf, ByteBuf> unaryMethod;
private MethodDescriptor<ByteBuf, ByteBuf> pingPongMethod;
private MethodDescriptor<ByteBuf, ByteBuf> flowControlledStreaming;
protected ChannelImpl[] channels; protected ChannelImpl[] channels;
public AbstractBenchmark() { public AbstractBenchmark() {
@ -103,13 +107,13 @@ public abstract class AbstractBenchmark {
* Initialize the environment for the executor. * Initialize the environment for the executor.
*/ */
public void setup(ExecutorType clientExecutor, public void setup(ExecutorType clientExecutor,
ExecutorType serverExecutor, ExecutorType serverExecutor,
PayloadSize requestSize, PayloadSize requestSize,
PayloadSize responseSize, PayloadSize responseSize,
FlowWindowSize windowSize, FlowWindowSize windowSize,
ChannelType channelType, ChannelType channelType,
int maxConcurrentStreams, int maxConcurrentStreams,
int channelCount) throws Exception { int channelCount) throws Exception {
NettyServerBuilder serverBuilder; NettyServerBuilder serverBuilder;
NettyChannelBuilder channelBuilder; NettyChannelBuilder channelBuilder;
if (channelType == ChannelType.LOCAL) { if (channelType == ChannelType.LOCAL) {
@ -155,8 +159,20 @@ public abstract class AbstractBenchmark {
TimeUnit.SECONDS, TimeUnit.SECONDS,
new ByteBufOutputMarshaller(), new ByteBufOutputMarshaller(),
new ByteBufOutputMarshaller()); new ByteBufOutputMarshaller());
pingPongMethod = MethodDescriptor.create(MethodType.DUPLEX_STREAMING,
"benchmark/pingPong",
5,
TimeUnit.SECONDS,
new ByteBufOutputMarshaller(),
new ByteBufOutputMarshaller());
flowControlledStreaming = MethodDescriptor.create(MethodType.DUPLEX_STREAMING,
"benchmark/flowControlledStreaming",
5,
TimeUnit.SECONDS,
new ByteBufOutputMarshaller(),
new ByteBufOutputMarshaller());
// Server implementation of same method // Server implementation of unary & streaming methods
serverBuilder.addService( serverBuilder.addService(
ServerServiceDefinition.builder("benchmark") ServerServiceDefinition.builder("benchmark")
.addMethod("unary", .addMethod("unary",
@ -191,7 +207,87 @@ public abstract class AbstractBenchmark {
} }
}; };
} }
}).build()); })
.addMethod("pingPong",
new ByteBufOutputMarshaller(),
new ByteBufOutputMarshaller(),
new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(String fullMethodName,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override
public void onPayload(ByteBuf payload) {
payload.release();
call.sendPayload(response.slice());
// Request next message
call.request(1);
}
@Override
public void onHalfClose() {
call.close(Status.OK, new Metadata.Trailers());
}
@Override
public void onCancel() {
}
@Override
public void onComplete() {
}
};
}
})
.addMethod("flowControlledStreaming",
new ByteBufOutputMarshaller(),
new ByteBufOutputMarshaller(),
new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(String fullMethodName,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override
public void onPayload(ByteBuf payload) {
payload.release();
while (call.isReady()) {
call.sendPayload(response.slice());
}
// Request next message
call.request(1);
}
@Override
public void onHalfClose() {
call.close(Status.OK, new Metadata.Trailers());
}
@Override
public void onCancel() {
}
@Override
public void onComplete() {
}
@Override
public void onReady() {
while (call.isReady()) {
call.sendPayload(response.slice());
}
}
};
}
})
.build());
// Build and start the clients and servers // Build and start the clients and servers
server = serverBuilder.build(); server = serverBuilder.build();
@ -207,18 +303,19 @@ public abstract class AbstractBenchmark {
/** /**
* Start a continuously executing set of unary calls that will terminate when * Start a continuously executing set of unary calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter * {@code done.get()} is true. Each completed call will increment the counter by the specified
* which benchmarks can check for progress. * delta which benchmarks can use to measure QPS or bandwidth.
*/ */
protected void startUnaryCalls(int callsPerChannel, protected void startUnaryCalls(int callsPerChannel,
final AtomicLong counter, final AtomicLong counter,
final AtomicBoolean done) { final AtomicBoolean done,
final long counterDelta) {
for (final ChannelImpl channel : channels) { for (final ChannelImpl channel : channels) {
for (int i = 0; i < callsPerChannel; i++) { for (int i = 0; i < callsPerChannel; i++) {
StreamObserver<ByteBuf> observer = new StreamObserver<ByteBuf>() { StreamObserver<ByteBuf> observer = new StreamObserver<ByteBuf>() {
@Override @Override
public void onValue(ByteBuf value) { public void onValue(ByteBuf value) {
counter.incrementAndGet(); counter.addAndGet(counterDelta);
} }
@Override @Override
@ -239,7 +336,94 @@ public abstract class AbstractBenchmark {
} }
} }
protected void stopChannelsAndServers() throws Exception { /**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected void startStreamingCalls(int callsPerChannel,
final AtomicLong counter,
final AtomicBoolean done,
final long counterDelta) {
for (final ChannelImpl channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final Call<ByteBuf, ByteBuf> streamingCall = channel.newCall(pingPongMethod);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
StreamObserver<ByteBuf> requestObserver = Calls.duplexStreamingCall(streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
requestObserverRef.get().onValue(request.slice());
streamingCall.request(1);
} else {
requestObserverRef.get().onCompleted();
}
}
@Override
public void onError(Throwable t) {
done.set(true);
}
@Override
public void onCompleted() {
}
});
requestObserverRef.set(requestObserver);
requestObserver.onValue(request.slice());
requestObserver.onValue(request.slice());
}
}
}
/**
* Start a continuously executing set of duplex streaming ping-pong calls that will terminate when
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected void startFlowControlledStreamingCalls(int callsPerChannel,
final AtomicLong counter,
final AtomicBoolean done,
final long counterDelta) {
for (final ChannelImpl channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final Call<ByteBuf, ByteBuf> streamingCall = channel.newCall(flowControlledStreaming);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
StreamObserver<ByteBuf> requestObserver = Calls.duplexStreamingCall(streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
streamingCall.request(1);
} else {
requestObserverRef.get().onCompleted();
}
}
@Override
public void onError(Throwable t) {
done.set(true);
}
@Override
public void onCompleted() {
}
});
requestObserverRef.set(requestObserver);
requestObserver.onValue(request.slice());
}
}
}
/**
* Shutdown all the client channels and then shutdown the server.
*/
protected void teardown() throws Exception {
for (ChannelImpl channel : channels) { for (ChannelImpl channel : channels) {
channel.shutdown(); channel.shutdown();
} }

View File

@ -0,0 +1,106 @@
package io.grpc.benchmarks.netty;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Benchmark measuring messages per second received from a streaming server. The server
* is obeying outbound flow-control.
*/
@State(Scope.Benchmark)
@Fork(1)
public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark {
@Param({"1", "2", "4"})
public int channelCount = 1;
@Param({"1", "2", "10", "100"})
public int maxConcurrentStreams = 1;
@Param
public ExecutorType clientExecutor = ExecutorType.DIRECT;
@Param({"SMALL"})
public PayloadSize responseSize = PayloadSize.SMALL;
private static AtomicLong callCounter;
private AtomicBoolean completed;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long messagesPerSecond() {
return callCounter.get();
}
}
/**
* Setup with direct executors, small payloads and the default flow-control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(clientExecutor,
ExecutorType.DIRECT,
PayloadSize.SMALL,
responseSize,
FlowWindowSize.MEDIUM,
ChannelType.NIO,
maxConcurrentStreams,
channelCount);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, 1);
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
super.teardown();
}
/**
* Measure the rate of messages received. The calls are already running, we just observe a counter
* of received responses.
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here.
Thread.sleep(1001);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
FlowControlledMessagesPerSecondBenchmark bench = new FlowControlledMessagesPerSecondBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}

View File

@ -0,0 +1,69 @@
package io.grpc.benchmarks.netty;
import io.grpc.stub.Calls;
import io.netty.buffer.Unpooled;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
/**
* Benchmark showing performance of a linear sequence of blocking calls in a single thread which
* is the worst case for throughput. The benchmark permutes response payload size and
* client inbound flow-control window size.
*/
@State(Scope.Benchmark)
@Fork(1)
public class SingleThreadBlockingQpsBenchmark extends AbstractBenchmark {
/**
* Setup with direct executors, small payloads and the default flow control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
PayloadSize.SMALL,
PayloadSize.SMALL,
FlowWindowSize.MEDIUM,
ChannelType.NIO,
1,
1);
}
/**
* Stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
Thread.sleep(5000);
super.teardown();
}
/**
* Issue a unary call and wait for the response.
*/
@Benchmark
public void blockingUnary() throws Exception {
Calls.blockingUnaryCall(channels[0].newCall(unaryMethod), Unpooled.EMPTY_BUFFER);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
SingleThreadBlockingQpsBenchmark bench = new SingleThreadBlockingQpsBenchmark();
bench.setup();
for (int i = 0; i < 10000; i++) {
bench.blockingUnary();
}
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}

View File

@ -0,0 +1,100 @@
package io.grpc.benchmarks.netty;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Benchmark measuring messages per second using a set of permanently open duplex streams which
* ping-pong messages.
*/
@State(Scope.Benchmark)
@Fork(1)
public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
@Param({"1", "2", "4", "8"})
public int channelCount = 1;
@Param({"1", "10", "100", "1000"})
public int maxConcurrentStreams = 1;
private static AtomicLong callCounter;
private AtomicBoolean completed;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long pingPongsPerSecond() {
return callCounter.get();
}
}
/**
* Setup with direct executors, small payloads and the default flow-control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
PayloadSize.SMALL,
PayloadSize.SMALL,
FlowWindowSize.MEDIUM,
ChannelType.NIO,
maxConcurrentStreams,
channelCount);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startStreamingCalls(maxConcurrentStreams, callCounter, completed, 1);
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
super.teardown();
}
/**
* Measure throughput of unary calls. The calls are already running, we just observe a counter
* of received responses.
*/
@Benchmark
public void pingPong(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here.
Thread.sleep(1001);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
StreamingPingPongsPerSecondBenchmark bench = new StreamingPingPongsPerSecondBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}

View File

@ -0,0 +1,104 @@
package io.grpc.benchmarks.netty;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting
* payload size and flow-control windows with number of concurrent calls. Async stubs are used
* to avoid context-switching overheads.
*/
@State(Scope.Benchmark)
@Fork(1)
public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
@Param({"1", "10"})
public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"})
public PayloadSize responseSize = PayloadSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
private static AtomicLong callCounter;
private AtomicBoolean completed;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long megabitsPerSecond() {
return (callCounter.get() * 8) >> 20;
}
}
/**
* Setup with direct executors and one channel.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
PayloadSize.SMALL,
responseSize,
clientInboundFlowWindow,
ChannelType.NIO,
maxConcurrentStreams,
1);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed,
responseSize.bytes());
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
super.teardown();
}
/**
* Measure bandwidth of streamed responses.
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here.
Thread.sleep(1001);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}

View File

@ -45,13 +45,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
* Benchmark using configuration intended to allow maximum QPS. * Benchmark using configuration intended to allow maximum QPS for unary calls.
*/ */
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(1) @Fork(1)
public class MaxQpsBenchmark extends AbstractBenchmark { public class UnaryCallQpsBenchmark extends AbstractBenchmark {
@Param({"1", "2", "4", "8", "16", "32"}) @Param({"1", "2", "4", "8"})
public int channelCount = 4; public int channelCount = 4;
@Param({"10", "100", "1000"}) @Param({"10", "100", "1000"})
@ -93,17 +93,18 @@ public class MaxQpsBenchmark extends AbstractBenchmark {
channelCount); channelCount);
callCounter = new AtomicLong(); callCounter = new AtomicLong();
completed = new AtomicBoolean(); completed = new AtomicBoolean();
startUnaryCalls(maxConcurrentStreams, callCounter, completed); startUnaryCalls(maxConcurrentStreams, callCounter, completed, 1);
} }
/** /**
* Stop the running calls then stop the server and client channels. * Stop the running calls then stop the server and client channels.
*/ */
@Override
@TearDown(Level.Trial) @TearDown(Level.Trial)
public void stopChannelsAndServers() throws Exception { public void teardown() throws Exception {
completed.set(true); completed.set(true);
Thread.sleep(5000); Thread.sleep(5000);
super.stopChannelsAndServers(); super.teardown();
} }
/** /**
@ -111,7 +112,7 @@ public class MaxQpsBenchmark extends AbstractBenchmark {
* of received responses. * of received responses.
*/ */
@Benchmark @Benchmark
public void measureUnary(AdditionalCounters counters) throws Exception { public void unary(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here. // No need to do anything, just sleep here.
Thread.sleep(1001); Thread.sleep(1001);
} }
@ -120,10 +121,10 @@ public class MaxQpsBenchmark extends AbstractBenchmark {
* Useful for triggering a subset of the benchmark in a profiler. * Useful for triggering a subset of the benchmark in a profiler.
*/ */
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
MaxQpsBenchmark bench = new MaxQpsBenchmark(); UnaryCallQpsBenchmark bench = new UnaryCallQpsBenchmark();
bench.setup(); bench.setup();
Thread.sleep(30000); Thread.sleep(30000);
bench.stopChannelsAndServers(); bench.teardown();
System.exit(0); System.exit(0);
} }
} }

View File

@ -0,0 +1,105 @@
package io.grpc.benchmarks.netty;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Benchmark intended to test response bandwidth in bytes/sec for unary calls by permuting
* payload size and flow-control windows with number of concurrent calls. Async stubs are used
* to avoid context-switching overheads.
*/
@State(Scope.Benchmark)
@Fork(1)
public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark {
@Param({"1", "10"})
public int maxConcurrentStreams = 1;
@Param({"LARGE", "JUMBO"})
public PayloadSize responseSize = PayloadSize.JUMBO;
@Param({"MEDIUM", "LARGE", "JUMBO"})
public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM;
private static AtomicLong callCounter;
private AtomicBoolean completed;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
* in the benchmark method.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
@Setup(Level.Iteration)
public void clean() {
callCounter.set(0);
}
public long megabitsPerSecond() {
// Convert bytes into megabits
return (callCounter.get() * 8) >> 20;
}
}
/**
* Setup with direct executors, small payloads and a large flow control window.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
super.setup(ExecutorType.DIRECT,
ExecutorType.DIRECT,
PayloadSize.SMALL,
responseSize,
clientInboundFlowWindow,
ChannelType.NIO,
maxConcurrentStreams,
1);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startUnaryCalls(maxConcurrentStreams, callCounter, completed, responseSize.bytes());
}
/**
* Stop the running calls then stop the server and client channels.
*/
@Override
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
super.teardown();
}
/**
* Measure bandwidth of unary call responses. The calls are already running, we just observe a
* counter of received responses.
*/
@Benchmark
public void unary(AdditionalCounters counters) throws Exception {
// No need to do anything, just sleep here.
Thread.sleep(1001);
}
/**
* Useful for triggering a subset of the benchmark in a profiler.
*/
public static void main(String[] argv) throws Exception {
UnaryCallResponseBandwidthBenchmark bench = new UnaryCallResponseBandwidthBenchmark();
bench.setup();
Thread.sleep(30000);
bench.teardown();
System.exit(0);
}
}