diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index 500280d204..17cd836068 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -2,6 +2,7 @@ package io.grpc.benchmarks.netty; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Call; import io.grpc.ChannelImpl; import io.grpc.DeferredInputStream; import io.grpc.Marshaller; @@ -35,6 +36,7 @@ import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -63,7 +65,7 @@ public abstract class AbstractBenchmark { * Standard flow-control window sizes. */ public enum FlowWindowSize { - SMALL(16384), MEDIUM(65536), LARGE(1048576), JUMBO(16777216); + SMALL(16383), MEDIUM(65535), LARGE(1048575), JUMBO(16777215); private final int bytes; FlowWindowSize(int bytes) { @@ -94,6 +96,8 @@ public abstract class AbstractBenchmark { protected ByteBuf request; protected ByteBuf response; protected MethodDescriptor unaryMethod; + private MethodDescriptor pingPongMethod; + private MethodDescriptor flowControlledStreaming; protected ChannelImpl[] channels; public AbstractBenchmark() { @@ -103,13 +107,13 @@ public abstract class AbstractBenchmark { * Initialize the environment for the executor. */ public void setup(ExecutorType clientExecutor, - ExecutorType serverExecutor, - PayloadSize requestSize, - PayloadSize responseSize, - FlowWindowSize windowSize, - ChannelType channelType, - int maxConcurrentStreams, - int channelCount) throws Exception { + ExecutorType serverExecutor, + PayloadSize requestSize, + PayloadSize responseSize, + FlowWindowSize windowSize, + ChannelType channelType, + int maxConcurrentStreams, + int channelCount) throws Exception { NettyServerBuilder serverBuilder; NettyChannelBuilder channelBuilder; if (channelType == ChannelType.LOCAL) { @@ -155,8 +159,20 @@ public abstract class AbstractBenchmark { TimeUnit.SECONDS, new ByteBufOutputMarshaller(), new ByteBufOutputMarshaller()); + pingPongMethod = MethodDescriptor.create(MethodType.DUPLEX_STREAMING, + "benchmark/pingPong", + 5, + TimeUnit.SECONDS, + new ByteBufOutputMarshaller(), + new ByteBufOutputMarshaller()); + flowControlledStreaming = MethodDescriptor.create(MethodType.DUPLEX_STREAMING, + "benchmark/flowControlledStreaming", + 5, + TimeUnit.SECONDS, + new ByteBufOutputMarshaller(), + new ByteBufOutputMarshaller()); - // Server implementation of same method + // Server implementation of unary & streaming methods serverBuilder.addService( ServerServiceDefinition.builder("benchmark") .addMethod("unary", @@ -191,7 +207,87 @@ public abstract class AbstractBenchmark { } }; } - }).build()); + }) + .addMethod("pingPong", + new ByteBufOutputMarshaller(), + new ByteBufOutputMarshaller(), + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall(String fullMethodName, + final ServerCall call, + Metadata.Headers headers) { + call.request(1); + return new ServerCall.Listener() { + @Override + public void onPayload(ByteBuf payload) { + payload.release(); + call.sendPayload(response.slice()); + // Request next message + call.request(1); + } + + @Override + public void onHalfClose() { + call.close(Status.OK, new Metadata.Trailers()); + } + + @Override + public void onCancel() { + + } + + @Override + public void onComplete() { + + } + }; + } + }) + .addMethod("flowControlledStreaming", + new ByteBufOutputMarshaller(), + new ByteBufOutputMarshaller(), + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall(String fullMethodName, + final ServerCall call, + Metadata.Headers headers) { + call.request(1); + return new ServerCall.Listener() { + @Override + public void onPayload(ByteBuf payload) { + payload.release(); + while (call.isReady()) { + call.sendPayload(response.slice()); + } + // Request next message + call.request(1); + } + + @Override + public void onHalfClose() { + call.close(Status.OK, new Metadata.Trailers()); + } + + @Override + public void onCancel() { + + } + + @Override + public void onComplete() { + + } + + @Override + public void onReady() { + while (call.isReady()) { + call.sendPayload(response.slice()); + } + } + }; + } + }) + .build()); // Build and start the clients and servers server = serverBuilder.build(); @@ -207,18 +303,19 @@ public abstract class AbstractBenchmark { /** * Start a continuously executing set of unary calls that will terminate when - * {@code done.get()} is true. Each completed call will increment the counter - * which benchmarks can check for progress. + * {@code done.get()} is true. Each completed call will increment the counter by the specified + * delta which benchmarks can use to measure QPS or bandwidth. */ protected void startUnaryCalls(int callsPerChannel, - final AtomicLong counter, - final AtomicBoolean done) { + final AtomicLong counter, + final AtomicBoolean done, + final long counterDelta) { for (final ChannelImpl channel : channels) { for (int i = 0; i < callsPerChannel; i++) { StreamObserver observer = new StreamObserver() { @Override public void onValue(ByteBuf value) { - counter.incrementAndGet(); + counter.addAndGet(counterDelta); } @Override @@ -239,7 +336,94 @@ public abstract class AbstractBenchmark { } } - protected void stopChannelsAndServers() throws Exception { + /** + * Start a continuously executing set of duplex streaming ping-pong calls that will terminate when + * {@code done.get()} is true. Each completed call will increment the counter by the specified + * delta which benchmarks can use to measure messages per second or bandwidth. + */ + protected void startStreamingCalls(int callsPerChannel, + final AtomicLong counter, + final AtomicBoolean done, + final long counterDelta) { + for (final ChannelImpl channel : channels) { + for (int i = 0; i < callsPerChannel; i++) { + final Call streamingCall = channel.newCall(pingPongMethod); + final AtomicReference> requestObserverRef = + new AtomicReference>(); + StreamObserver requestObserver = Calls.duplexStreamingCall(streamingCall, + new StreamObserver() { + @Override + public void onValue(ByteBuf value) { + if (!done.get()) { + counter.addAndGet(counterDelta); + requestObserverRef.get().onValue(request.slice()); + streamingCall.request(1); + } else { + requestObserverRef.get().onCompleted(); + } + } + + @Override + public void onError(Throwable t) { + done.set(true); + } + + @Override + public void onCompleted() { + } + }); + requestObserverRef.set(requestObserver); + requestObserver.onValue(request.slice()); + requestObserver.onValue(request.slice()); + } + } + } + + /** + * Start a continuously executing set of duplex streaming ping-pong calls that will terminate when + * {@code done.get()} is true. Each completed call will increment the counter by the specified + * delta which benchmarks can use to measure messages per second or bandwidth. + */ + protected void startFlowControlledStreamingCalls(int callsPerChannel, + final AtomicLong counter, + final AtomicBoolean done, + final long counterDelta) { + for (final ChannelImpl channel : channels) { + for (int i = 0; i < callsPerChannel; i++) { + final Call streamingCall = channel.newCall(flowControlledStreaming); + final AtomicReference> requestObserverRef = + new AtomicReference>(); + StreamObserver requestObserver = Calls.duplexStreamingCall(streamingCall, + new StreamObserver() { + @Override + public void onValue(ByteBuf value) { + if (!done.get()) { + counter.addAndGet(counterDelta); + streamingCall.request(1); + } else { + requestObserverRef.get().onCompleted(); + } + } + + @Override + public void onError(Throwable t) { + done.set(true); + } + + @Override + public void onCompleted() { + } + }); + requestObserverRef.set(requestObserver); + requestObserver.onValue(request.slice()); + } + } + } + + /** + * Shutdown all the client channels and then shutdown the server. + */ + protected void teardown() throws Exception { for (ChannelImpl channel : channels) { channel.shutdown(); } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java new file mode 100644 index 0000000000..178608cfaf --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java @@ -0,0 +1,106 @@ +package io.grpc.benchmarks.netty; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark measuring messages per second received from a streaming server. The server + * is obeying outbound flow-control. + */ +@State(Scope.Benchmark) +@Fork(1) +public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark { + + @Param({"1", "2", "4"}) + public int channelCount = 1; + + @Param({"1", "2", "10", "100"}) + public int maxConcurrentStreams = 1; + + @Param + public ExecutorType clientExecutor = ExecutorType.DIRECT; + + @Param({"SMALL"}) + public PayloadSize responseSize = PayloadSize.SMALL; + + private static AtomicLong callCounter; + private AtomicBoolean completed; + + /** + * Use an AuxCounter so we can measure that calls as they occur without consuming CPU + * in the benchmark method. + */ + @AuxCounters + @State(Scope.Thread) + public static class AdditionalCounters { + + @Setup(Level.Iteration) + public void clean() { + callCounter.set(0); + } + + public long messagesPerSecond() { + return callCounter.get(); + } + } + + /** + * Setup with direct executors, small payloads and the default flow-control window. + */ + @Setup(Level.Trial) + public void setup() throws Exception { + super.setup(clientExecutor, + ExecutorType.DIRECT, + PayloadSize.SMALL, + responseSize, + FlowWindowSize.MEDIUM, + ChannelType.NIO, + maxConcurrentStreams, + channelCount); + callCounter = new AtomicLong(); + completed = new AtomicBoolean(); + startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + } + + /** + * Stop the running calls then stop the server and client channels. + */ + @Override + @TearDown(Level.Trial) + public void teardown() throws Exception { + completed.set(true); + Thread.sleep(5000); + super.teardown(); + } + + /** + * Measure the rate of messages received. The calls are already running, we just observe a counter + * of received responses. + */ + @Benchmark + public void stream(AdditionalCounters counters) throws Exception { + // No need to do anything, just sleep here. + Thread.sleep(1001); + } + + /** + * Useful for triggering a subset of the benchmark in a profiler. + */ + public static void main(String[] argv) throws Exception { + FlowControlledMessagesPerSecondBenchmark bench = new FlowControlledMessagesPerSecondBenchmark(); + bench.setup(); + Thread.sleep(30000); + bench.teardown(); + System.exit(0); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java new file mode 100644 index 0000000000..b4982106de --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java @@ -0,0 +1,69 @@ +package io.grpc.benchmarks.netty; + +import io.grpc.stub.Calls; +import io.netty.buffer.Unpooled; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +/** + * Benchmark showing performance of a linear sequence of blocking calls in a single thread which + * is the worst case for throughput. The benchmark permutes response payload size and + * client inbound flow-control window size. + */ +@State(Scope.Benchmark) +@Fork(1) +public class SingleThreadBlockingQpsBenchmark extends AbstractBenchmark { + + /** + * Setup with direct executors, small payloads and the default flow control window. + */ + @Setup(Level.Trial) + public void setup() throws Exception { + super.setup(ExecutorType.DIRECT, + ExecutorType.DIRECT, + PayloadSize.SMALL, + PayloadSize.SMALL, + FlowWindowSize.MEDIUM, + ChannelType.NIO, + 1, + 1); + } + + /** + * Stop the server and client channels. + */ + @Override + @TearDown(Level.Trial) + public void teardown() throws Exception { + Thread.sleep(5000); + super.teardown(); + } + + /** + * Issue a unary call and wait for the response. + */ + @Benchmark + public void blockingUnary() throws Exception { + Calls.blockingUnaryCall(channels[0].newCall(unaryMethod), Unpooled.EMPTY_BUFFER); + } + + /** + * Useful for triggering a subset of the benchmark in a profiler. + */ + public static void main(String[] argv) throws Exception { + SingleThreadBlockingQpsBenchmark bench = new SingleThreadBlockingQpsBenchmark(); + bench.setup(); + for (int i = 0; i < 10000; i++) { + bench.blockingUnary(); + } + Thread.sleep(30000); + bench.teardown(); + System.exit(0); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java new file mode 100644 index 0000000000..45726c2781 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java @@ -0,0 +1,100 @@ +package io.grpc.benchmarks.netty; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark measuring messages per second using a set of permanently open duplex streams which + * ping-pong messages. + */ +@State(Scope.Benchmark) +@Fork(1) +public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { + + @Param({"1", "2", "4", "8"}) + public int channelCount = 1; + + @Param({"1", "10", "100", "1000"}) + public int maxConcurrentStreams = 1; + + private static AtomicLong callCounter; + private AtomicBoolean completed; + + /** + * Use an AuxCounter so we can measure that calls as they occur without consuming CPU + * in the benchmark method. + */ + @AuxCounters + @State(Scope.Thread) + public static class AdditionalCounters { + + @Setup(Level.Iteration) + public void clean() { + callCounter.set(0); + } + + public long pingPongsPerSecond() { + return callCounter.get(); + } + } + + /** + * Setup with direct executors, small payloads and the default flow-control window. + */ + @Setup(Level.Trial) + public void setup() throws Exception { + super.setup(ExecutorType.DIRECT, + ExecutorType.DIRECT, + PayloadSize.SMALL, + PayloadSize.SMALL, + FlowWindowSize.MEDIUM, + ChannelType.NIO, + maxConcurrentStreams, + channelCount); + callCounter = new AtomicLong(); + completed = new AtomicBoolean(); + startStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + } + + /** + * Stop the running calls then stop the server and client channels. + */ + @Override + @TearDown(Level.Trial) + public void teardown() throws Exception { + completed.set(true); + Thread.sleep(5000); + super.teardown(); + } + + /** + * Measure throughput of unary calls. The calls are already running, we just observe a counter + * of received responses. + */ + @Benchmark + public void pingPong(AdditionalCounters counters) throws Exception { + // No need to do anything, just sleep here. + Thread.sleep(1001); + } + + /** + * Useful for triggering a subset of the benchmark in a profiler. + */ + public static void main(String[] argv) throws Exception { + StreamingPingPongsPerSecondBenchmark bench = new StreamingPingPongsPerSecondBenchmark(); + bench.setup(); + Thread.sleep(30000); + bench.teardown(); + System.exit(0); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java new file mode 100644 index 0000000000..bfcd44e1a5 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java @@ -0,0 +1,104 @@ +package io.grpc.benchmarks.netty; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting + * payload size and flow-control windows with number of concurrent calls. Async stubs are used + * to avoid context-switching overheads. + */ +@State(Scope.Benchmark) +@Fork(1) +public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { + + @Param({"1", "10"}) + public int maxConcurrentStreams = 1; + + @Param({"LARGE", "JUMBO"}) + public PayloadSize responseSize = PayloadSize.JUMBO; + + @Param({"MEDIUM", "LARGE", "JUMBO"}) + public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; + + private static AtomicLong callCounter; + private AtomicBoolean completed; + + /** + * Use an AuxCounter so we can measure that calls as they occur without consuming CPU + * in the benchmark method. + */ + @AuxCounters + @State(Scope.Thread) + public static class AdditionalCounters { + + @Setup(Level.Iteration) + public void clean() { + callCounter.set(0); + } + + public long megabitsPerSecond() { + return (callCounter.get() * 8) >> 20; + } + } + + /** + * Setup with direct executors and one channel. + */ + @Setup(Level.Trial) + public void setup() throws Exception { + super.setup(ExecutorType.DIRECT, + ExecutorType.DIRECT, + PayloadSize.SMALL, + responseSize, + clientInboundFlowWindow, + ChannelType.NIO, + maxConcurrentStreams, + 1); + callCounter = new AtomicLong(); + completed = new AtomicBoolean(); + startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, + responseSize.bytes()); + } + + /** + * Stop the running calls then stop the server and client channels. + */ + @Override + @TearDown(Level.Trial) + public void teardown() throws Exception { + completed.set(true); + Thread.sleep(5000); + super.teardown(); + } + + /** + * Measure bandwidth of streamed responses. + */ + @Benchmark + public void stream(AdditionalCounters counters) throws Exception { + // No need to do anything, just sleep here. + Thread.sleep(1001); + } + + /** + * Useful for triggering a subset of the benchmark in a profiler. + */ + public static void main(String[] argv) throws Exception { + StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark(); + bench.setup(); + Thread.sleep(30000); + bench.teardown(); + System.exit(0); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/MaxQpsBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java similarity index 90% rename from benchmarks/src/jmh/java/io/grpc/benchmarks/netty/MaxQpsBenchmark.java rename to benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java index a680806f57..7e063cecb4 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/MaxQpsBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallQpsBenchmark.java @@ -45,13 +45,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** - * Benchmark using configuration intended to allow maximum QPS. + * Benchmark using configuration intended to allow maximum QPS for unary calls. */ @State(Scope.Benchmark) @Fork(1) -public class MaxQpsBenchmark extends AbstractBenchmark { +public class UnaryCallQpsBenchmark extends AbstractBenchmark { - @Param({"1", "2", "4", "8", "16", "32"}) + @Param({"1", "2", "4", "8"}) public int channelCount = 4; @Param({"10", "100", "1000"}) @@ -93,17 +93,18 @@ public class MaxQpsBenchmark extends AbstractBenchmark { channelCount); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startUnaryCalls(maxConcurrentStreams, callCounter, completed); + startUnaryCalls(maxConcurrentStreams, callCounter, completed, 1); } /** * Stop the running calls then stop the server and client channels. */ + @Override @TearDown(Level.Trial) - public void stopChannelsAndServers() throws Exception { + public void teardown() throws Exception { completed.set(true); Thread.sleep(5000); - super.stopChannelsAndServers(); + super.teardown(); } /** @@ -111,7 +112,7 @@ public class MaxQpsBenchmark extends AbstractBenchmark { * of received responses. */ @Benchmark - public void measureUnary(AdditionalCounters counters) throws Exception { + public void unary(AdditionalCounters counters) throws Exception { // No need to do anything, just sleep here. Thread.sleep(1001); } @@ -120,10 +121,10 @@ public class MaxQpsBenchmark extends AbstractBenchmark { * Useful for triggering a subset of the benchmark in a profiler. */ public static void main(String[] argv) throws Exception { - MaxQpsBenchmark bench = new MaxQpsBenchmark(); + UnaryCallQpsBenchmark bench = new UnaryCallQpsBenchmark(); bench.setup(); Thread.sleep(30000); - bench.stopChannelsAndServers(); + bench.teardown(); System.exit(0); } } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java new file mode 100644 index 0000000000..118209bd79 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java @@ -0,0 +1,105 @@ +package io.grpc.benchmarks.netty; + +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Benchmark intended to test response bandwidth in bytes/sec for unary calls by permuting + * payload size and flow-control windows with number of concurrent calls. Async stubs are used + * to avoid context-switching overheads. + */ +@State(Scope.Benchmark) +@Fork(1) +public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark { + + @Param({"1", "10"}) + public int maxConcurrentStreams = 1; + + @Param({"LARGE", "JUMBO"}) + public PayloadSize responseSize = PayloadSize.JUMBO; + + @Param({"MEDIUM", "LARGE", "JUMBO"}) + public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; + + private static AtomicLong callCounter; + private AtomicBoolean completed; + + /** + * Use an AuxCounter so we can measure that calls as they occur without consuming CPU + * in the benchmark method. + */ + @AuxCounters + @State(Scope.Thread) + public static class AdditionalCounters { + + @Setup(Level.Iteration) + public void clean() { + callCounter.set(0); + } + + public long megabitsPerSecond() { + // Convert bytes into megabits + return (callCounter.get() * 8) >> 20; + } + } + + /** + * Setup with direct executors, small payloads and a large flow control window. + */ + @Setup(Level.Trial) + public void setup() throws Exception { + super.setup(ExecutorType.DIRECT, + ExecutorType.DIRECT, + PayloadSize.SMALL, + responseSize, + clientInboundFlowWindow, + ChannelType.NIO, + maxConcurrentStreams, + 1); + callCounter = new AtomicLong(); + completed = new AtomicBoolean(); + startUnaryCalls(maxConcurrentStreams, callCounter, completed, responseSize.bytes()); + } + + /** + * Stop the running calls then stop the server and client channels. + */ + @Override + @TearDown(Level.Trial) + public void teardown() throws Exception { + completed.set(true); + Thread.sleep(5000); + super.teardown(); + } + + /** + * Measure bandwidth of unary call responses. The calls are already running, we just observe a + * counter of received responses. + */ + @Benchmark + public void unary(AdditionalCounters counters) throws Exception { + // No need to do anything, just sleep here. + Thread.sleep(1001); + } + + /** + * Useful for triggering a subset of the benchmark in a profiler. + */ + public static void main(String[] argv) throws Exception { + UnaryCallResponseBandwidthBenchmark bench = new UnaryCallResponseBandwidthBenchmark(); + bench.setup(); + Thread.sleep(30000); + bench.teardown(); + System.exit(0); + } +}