diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/ThreadlessExecutor.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/ThreadlessExecutor.java new file mode 100644 index 0000000000..3c566e24ff --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/ThreadlessExecutor.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.benchmarks; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; + +final class ThreadlessExecutor implements Executor { + private final BlockingQueue queue = new LinkedBlockingQueue(); + + /** + * Waits until there is a Runnable, then executes it and all queued Runnables after it. + */ + public void waitAndDrain() throws InterruptedException { + Runnable runnable = queue.take(); + while (runnable != null) { + try { + runnable.run(); + } catch (Throwable t) { + throw new RuntimeException("Runnable threw exception", t); + } + runnable = queue.poll(); + } + } + + @Override + public void execute(Runnable runnable) { + queue.add(runnable); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java index c4c7bb366c..4f3f72eef7 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java @@ -34,6 +34,7 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.okhttp.OkHttpChannelBuilder; +import io.grpc.stub.StreamObserver; import io.netty.channel.Channel; import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.EventLoopGroup; @@ -47,12 +48,14 @@ import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; /** Some text. */ @State(Scope.Benchmark) @@ -69,6 +72,7 @@ public class TransportBenchmark { private ManagedChannel channel; private Server server; private BenchmarkServiceGrpc.BenchmarkServiceBlockingStub stub; + private BenchmarkServiceGrpc.BenchmarkServiceStub asyncStub; private EventLoopGroup groupToShutdown; @Setup @@ -157,6 +161,7 @@ public class TransportBenchmark { server.start(); channel = channelBuilder.build(); stub = BenchmarkServiceGrpc.newBlockingStub(channel); + asyncStub = BenchmarkServiceGrpc.newStub(channel); // Wait for channel to start stub.unaryCall(SimpleRequest.getDefaultInstance()); } @@ -182,7 +187,7 @@ public class TransportBenchmark { } } - private SimpleRequest simpleRequest = SimpleRequest.newBuilder() + private static final SimpleRequest UNARY_CALL_1024_REQUEST = SimpleRequest.newBuilder() .setResponseSize(1024) .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1024]))) .build(); @@ -190,7 +195,86 @@ public class TransportBenchmark { @Benchmark @BenchmarkMode(Mode.SampleTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) - public SimpleResponse unaryCall1024() { - return stub.unaryCall(simpleRequest); + public SimpleResponse unaryCall1024Latency() { + return stub.unaryCall(UNARY_CALL_1024_REQUEST); + } + + private static final int BYTE_THROUGHPUT_RESPONSE_SIZE = 1048576; + private static final SimpleRequest BYTE_THROUGHPUT_REQUEST = SimpleRequest.newBuilder() + .setResponseSize(BYTE_THROUGHPUT_RESPONSE_SIZE) + .build(); + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) + @Threads(10) + public SimpleResponse unaryCallsByteThroughput() { + return stub.unaryCall(BYTE_THROUGHPUT_REQUEST); + } + + private static final Throwable OK_THROWABLE = new RuntimeException("OK"); + + @State(Scope.Thread) + public static class PingPongStreamState { + private final ThreadlessExecutor executor = new ThreadlessExecutor(); + private StreamObserver requestObserver; + private SimpleResponse response; + private Throwable status; + + @Setup + public void setUp(TransportBenchmark bench) { + requestObserver = bench.asyncStub + .withExecutor(executor) + .streamingCall(new StreamObserver() { + @Override public void onNext(SimpleResponse next) { + assert response == null; + response = next; + } + + @Override public void onError(Throwable t) { + status = t; + } + + @Override public void onCompleted() { + status = OK_THROWABLE; + } + }); + } + + /** Issues request and waits for response. */ + public SimpleResponse pingPong(SimpleRequest request) throws InterruptedException { + requestObserver.onNext(request); + while (true) { + executor.waitAndDrain(); + if (response != null) { + SimpleResponse savedResponse = response; + response = null; + return savedResponse; + } + if (status != null) { + throw new RuntimeException("Unexpected stream termination", status); + } + } + } + + @TearDown + public void tearDown() throws InterruptedException { + requestObserver.onCompleted(); + while (status == null) { + executor.waitAndDrain(); + } + if (status != OK_THROWABLE) { + throw new RuntimeException("Non-graceful stream shutdown", status); + } + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OperationsPerInvocation(BYTE_THROUGHPUT_RESPONSE_SIZE) + @Threads(10) + public SimpleResponse streamingCallsByteThroughput(PingPongStreamState state) + throws InterruptedException { + return state.pingPong(BYTE_THROUGHPUT_REQUEST); } } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java deleted file mode 100644 index fcd4dffcbc..0000000000 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2015 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.benchmarks.netty; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.openjdk.jmh.annotations.AuxCounters; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -/** - * Benchmark intended to test response bandwidth in bytes/sec for streaming calls by permuting - * payload size and flow-control windows with number of concurrent calls. Async stubs are used - * to avoid context-switching overheads. - */ -@State(Scope.Benchmark) -@Fork(1) -public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { - - @Param({"1", "10"}) - public int maxConcurrentStreams = 1; - - @Param({"LARGE", "JUMBO"}) - public MessageSize responseSize = MessageSize.JUMBO; - - @Param({"MEDIUM", "LARGE", "JUMBO"}) - public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; - - private static AtomicLong callCounter; - private AtomicBoolean completed; - private AtomicBoolean record; - private CountDownLatch latch; - - /** - * Use an AuxCounter so we can measure that calls as they occur without consuming CPU - * in the benchmark method. - */ - @AuxCounters - @State(Scope.Thread) - public static class AdditionalCounters { - - @Setup(Level.Iteration) - public void clean() { - callCounter.set(0); - } - - public long megabitsPerSecond() { - return (callCounter.get() * 8) >> 20; - } - } - - /** - * Setup with direct executors and one channel. - */ - @Setup(Level.Trial) - public void setup() throws Exception { - super.setup(ExecutorType.DIRECT, - ExecutorType.DIRECT, - MessageSize.SMALL, - responseSize, - clientInboundFlowWindow, - ChannelType.NIO, - maxConcurrentStreams, - 1); - callCounter = new AtomicLong(); - completed = new AtomicBoolean(); - record = new AtomicBoolean(); - latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, - responseSize.bytes()); - } - - /** - * Stop the running calls then stop the server and client channels. - */ - @Override - @TearDown(Level.Trial) - public void teardown() throws Exception { - completed.set(true); - if (!latch.await(5, TimeUnit.SECONDS)) { - System.err.println("Failed to shutdown all calls."); - } - super.teardown(); - } - - /** - * Measure bandwidth of streamed responses. - */ - @Benchmark - public void stream(AdditionalCounters counters) throws Exception { - record.set(true); - // No need to do anything, just sleep here. - Thread.sleep(1001); - record.set(false); - } - - /** - * Useful for triggering a subset of the benchmark in a profiler. - */ - public static void main(String[] argv) throws Exception { - StreamingResponseBandwidthBenchmark bench = new StreamingResponseBandwidthBenchmark(); - bench.setup(); - Thread.sleep(30000); - bench.teardown(); - System.exit(0); - } -} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java deleted file mode 100644 index 80e0eec2b5..0000000000 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/UnaryCallResponseBandwidthBenchmark.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2015 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.benchmarks.netty; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import org.openjdk.jmh.annotations.AuxCounters; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -/** - * Benchmark intended to test response bandwidth in bytes/sec for unary calls by permuting - * payload size and flow-control windows with number of concurrent calls. Async stubs are used - * to avoid context-switching overheads. - */ -@State(Scope.Benchmark) -@Fork(1) -public class UnaryCallResponseBandwidthBenchmark extends AbstractBenchmark { - - @Param({"1", "10"}) - public int maxConcurrentStreams = 1; - - @Param({"LARGE", "JUMBO"}) - public MessageSize responseSize = MessageSize.JUMBO; - - @Param({"MEDIUM", "LARGE", "JUMBO"}) - public FlowWindowSize clientInboundFlowWindow = FlowWindowSize.MEDIUM; - - private static AtomicLong callCounter; - private AtomicBoolean completed; - - /** - * Use an AuxCounter so we can measure that calls as they occur without consuming CPU - * in the benchmark method. - */ - @AuxCounters - @State(Scope.Thread) - public static class AdditionalCounters { - - @Setup(Level.Iteration) - public void clean() { - callCounter.set(0); - } - - public long megabitsPerSecond() { - // Convert bytes into megabits - return (callCounter.get() * 8) >> 20; - } - } - - /** - * Setup with direct executors, small payloads and a large flow control window. - */ - @Setup(Level.Trial) - public void setup() throws Exception { - super.setup(ExecutorType.DIRECT, - ExecutorType.DIRECT, - MessageSize.SMALL, - responseSize, - clientInboundFlowWindow, - ChannelType.NIO, - maxConcurrentStreams, - 1); - callCounter = new AtomicLong(); - completed = new AtomicBoolean(); - startUnaryCalls(maxConcurrentStreams, callCounter, completed, responseSize.bytes()); - } - - /** - * Stop the running calls then stop the server and client channels. - */ - @Override - @TearDown(Level.Trial) - public void teardown() throws Exception { - completed.set(true); - Thread.sleep(5000); - super.teardown(); - } - - /** - * Measure bandwidth of unary call responses. The calls are already running, we just observe a - * counter of received responses. - */ - @Benchmark - public void unary(AdditionalCounters counters) throws Exception { - // No need to do anything, just sleep here. - Thread.sleep(1001); - } - - /** - * Useful for triggering a subset of the benchmark in a profiler. - */ - public static void main(String[] argv) throws Exception { - UnaryCallResponseBandwidthBenchmark bench = new UnaryCallResponseBandwidthBenchmark(); - bench.setup(); - Thread.sleep(30000); - bench.teardown(); - System.exit(0); - } -}