From daed6e01b156ce9437a0b4dfe4159983a2cf5f18 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Sat, 17 Feb 2018 22:51:06 -0800 Subject: [PATCH] benchmarks: Move message throughput benchmark to TransportBenchmark This replaces FlowControlledMessagesPerSecondBenchmark, except it does not avoid local flow control issues via request(5). If hacking in a request(5), this benchmark produces similar results (non-direct: 671k vs previously 641k msg/s). --- .../benchmarks/CancellableInterceptor.java | 43 ++++++ .../grpc/benchmarks/TransportBenchmark.java | 43 ++++++ ...wControlledMessagesPerSecondBenchmark.java | 136 ------------------ 3 files changed, 86 insertions(+), 136 deletions(-) create mode 100644 benchmarks/src/jmh/java/io/grpc/benchmarks/CancellableInterceptor.java delete mode 100644 benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/CancellableInterceptor.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/CancellableInterceptor.java new file mode 100644 index 0000000000..97d8183081 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/CancellableInterceptor.java @@ -0,0 +1,43 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.benchmarks; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; + +/** Interceptor that lets you cancel the most recent call made. This class is not thread-safe. */ +class CancellableInterceptor implements ClientInterceptor { + private ClientCall call; + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + ClientCall call = next.newCall(method, callOptions); + this.call = call; + return call; + } + + public void cancel(String message, Throwable cause) { + if (call == null) { + throw new NullPointerException("No previous call"); + } + call.cancel(message, cause); + } +} diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java index 4f3f72eef7..c447f937bc 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/TransportBenchmark.java @@ -21,6 +21,8 @@ import static io.grpc.benchmarks.Utils.pickUnusedPort; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; import io.grpc.Server; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.benchmarks.proto.BenchmarkServiceGrpc; import io.grpc.benchmarks.proto.Messages.Payload; import io.grpc.benchmarks.proto.Messages.SimpleRequest; @@ -43,6 +45,7 @@ import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalServerChannel; import java.net.InetSocketAddress; +import java.util.Iterator; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; @@ -277,4 +280,44 @@ public class TransportBenchmark { throws InterruptedException { return state.pingPong(BYTE_THROUGHPUT_REQUEST); } + + @State(Scope.Thread) + public static class InfiniteStreamState { + private final CancellableInterceptor cancellableInterceptor = new CancellableInterceptor(); + private Iterator iter; + + @Setup + public void setUp(TransportBenchmark bench) { + iter = bench.stub + .withInterceptors(cancellableInterceptor) + .streamingFromServer(SimpleRequest.getDefaultInstance()); + } + + public SimpleResponse recv() throws InterruptedException { + return iter.next(); + } + + @TearDown + public void tearDown() throws InterruptedException { + cancellableInterceptor.cancel("Normal tear-down", null); + try { + // Need to drain the queue + while (iter.hasNext()) { + iter.next(); + } + } catch (StatusRuntimeException ex) { + if (!Status.Code.CANCELLED.equals(ex.getStatus().getCode())) { + throw ex; + } + } + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @Threads(10) + public SimpleResponse streamingCallsMessageThroughput(InfiniteStreamState state) + throws InterruptedException { + return state.recv(); + } } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java deleted file mode 100644 index 3be6fe9a8a..0000000000 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2015 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.benchmarks.netty; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; -import org.openjdk.jmh.annotations.AuxCounters; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; - -/** - * Benchmark measuring messages per second received from a streaming server. The server - * is obeying outbound flow-control. - */ -@State(Scope.Benchmark) -@Fork(1) -public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark { - - private static final Logger logger = - Logger.getLogger(FlowControlledMessagesPerSecondBenchmark.class.getName()); - - @Param({"1", "2", "4"}) - public int channelCount = 1; - - @Param({"1", "2", "10", "100"}) - public int maxConcurrentStreams = 1; - - @Param - public ExecutorType clientExecutor = ExecutorType.DIRECT; - - @Param({"SMALL"}) - public MessageSize responseSize = MessageSize.SMALL; - - private static AtomicLong callCounter; - private AtomicBoolean completed; - private AtomicBoolean record; - private CountDownLatch latch; - - /** - * Use an AuxCounter so we can measure that calls as they occur without consuming CPU - * in the benchmark method. - */ - @AuxCounters - @State(Scope.Thread) - public static class AdditionalCounters { - - @Setup(Level.Iteration) - public void clean() { - callCounter.set(0); - } - - public long messagesPerSecond() { - return callCounter.get(); - } - } - - /** - * Setup with direct executors, small payloads and the default flow-control window. - */ - @Setup(Level.Trial) - public void setup() throws Exception { - super.setup(clientExecutor, - ExecutorType.DIRECT, - MessageSize.SMALL, - responseSize, - FlowWindowSize.MEDIUM, - ChannelType.NIO, - maxConcurrentStreams, - channelCount); - callCounter = new AtomicLong(); - completed = new AtomicBoolean(); - record = new AtomicBoolean(); - latch = - startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); - } - - /** - * Stop the running calls then stop the server and client channels. - */ - @Override - @TearDown(Level.Trial) - public void teardown() throws Exception { - completed.set(true); - if (!latch.await(5, TimeUnit.SECONDS)) { - logger.warning("Failed to shutdown all calls."); - } - - super.teardown(); - } - - /** - * Measure the rate of messages received. The calls are already running, we just observe a counter - * of received responses. - */ - @Benchmark - public void stream(AdditionalCounters counters) throws Exception { - record.set(true); - // No need to do anything, just sleep here. - Thread.sleep(1001); - record.set(false); - } - - /** - * Useful for triggering a subset of the benchmark in a profiler. - */ - public static void main(String[] argv) throws Exception { - FlowControlledMessagesPerSecondBenchmark bench = new FlowControlledMessagesPerSecondBenchmark(); - bench.setup(); - Thread.sleep(30000); - bench.teardown(); - System.exit(0); - } -}