From bfd12e4a86bfdc8bf49507476fa974dafc3ef59e Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 22 Jul 2016 11:16:42 -0700 Subject: [PATCH] benchmarks: improve benchmarks recording and shutdown The benchmarks today do not have a good way to record metrics with precision or shutdown safely when the benchmark is over. This change alters the AbstractBenchmark class to return a latch that can be waited upon when ending the benchmark. Benchmarks also would accidentally request way too many messages from the server by calling request(1) explicitly in addition to the implicit one in the StreamObserver to Call adapter. This change adds a few outstanding requests, but otherwise keeps the request count bounded. Additionally, benchmark calls would ignore errors, and just shutdown in such cases. This changes them to log the error and just wait for the benchmark to complete. In the successful case, the benchmark client notifies server by halfClosing (via onCompleted) where it previously did not. It is also careful to only do this once. Lastly, Benchmarks have been changes to enable and disable recording at exact points in the benchmark method, rather than waiting for teardown to occur. Also, recording begins inside the recording method, not in Setup. JMH may do other procressing before, between, and after iterations. --- .../benchmarks/netty/AbstractBenchmark.java | 79 ++++++++++++++----- ...wControlledMessagesPerSecondBenchmark.java | 19 ++++- .../StreamingPingPongsPerSecondBenchmark.java | 16 +++- .../StreamingResponseBandwidthBenchmark.java | 13 ++- 4 files changed, 102 insertions(+), 25 deletions(-) diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index c458c41624..8704e889c0 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -65,17 +65,22 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Abstract base class for Netty end-to-end benchmarks. */ public abstract class AbstractBenchmark { + private static final Logger logger = Logger.getLogger(AbstractBenchmark.class.getName()); + /** * Standard message sizes. */ @@ -429,35 +434,44 @@ public abstract class AbstractBenchmark { * {@code done.get()} is true. Each completed call will increment the counter by the specified * delta which benchmarks can use to measure messages per second or bandwidth. */ - protected void startStreamingCalls(int callsPerChannel, - final AtomicLong counter, - final AtomicBoolean done, - final long counterDelta) { + protected CountDownLatch startStreamingCalls(int callsPerChannel, final AtomicLong counter, + final AtomicBoolean record, final AtomicBoolean done, final long counterDelta) { + final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length); for (final ManagedChannel channel : channels) { for (int i = 0; i < callsPerChannel; i++) { final ClientCall streamingCall = channel.newCall(pingPongMethod, CALL_OPTIONS); final AtomicReference> requestObserverRef = new AtomicReference>(); + final AtomicBoolean ignoreMessages = new AtomicBoolean(); StreamObserver requestObserver = ClientCalls.asyncBidiStreamingCall( streamingCall, new StreamObserver() { @Override public void onNext(ByteBuf value) { - if (!done.get()) { - counter.addAndGet(counterDelta); - requestObserverRef.get().onNext(request.slice()); - streamingCall.request(1); + if (done.get()) { + if (!ignoreMessages.getAndSet(true)) { + requestObserverRef.get().onCompleted(); + } + return; } + requestObserverRef.get().onNext(request.slice()); + if (record.get()) { + counter.addAndGet(counterDelta); + } + // request is called automatically because the observer implicitly has auto + // inbound flow control } @Override public void onError(Throwable t) { - done.set(true); + logger.log(Level.WARNING, "call error", t); + latch.countDown(); } @Override public void onCompleted() { + latch.countDown(); } }); requestObserverRef.set(requestObserver); @@ -465,6 +479,7 @@ public abstract class AbstractBenchmark { requestObserver.onNext(request.slice()); } } + return latch; } /** @@ -472,50 +487,76 @@ public abstract class AbstractBenchmark { * {@code done.get()} is true. Each completed call will increment the counter by the specified * delta which benchmarks can use to measure messages per second or bandwidth. */ - protected void startFlowControlledStreamingCalls(int callsPerChannel, - final AtomicLong counter, - final AtomicBoolean done, - final long counterDelta) { + protected CountDownLatch startFlowControlledStreamingCalls(int callsPerChannel, + final AtomicLong counter, final AtomicBoolean record, final AtomicBoolean done, + final long counterDelta) { + final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length); for (final ManagedChannel channel : channels) { for (int i = 0; i < callsPerChannel; i++) { final ClientCall streamingCall = channel.newCall(flowControlledStreaming, CALL_OPTIONS); final AtomicReference> requestObserverRef = new AtomicReference>(); + final AtomicBoolean ignoreMessages = new AtomicBoolean(); StreamObserver requestObserver = ClientCalls.asyncBidiStreamingCall( streamingCall, new StreamObserver() { @Override public void onNext(ByteBuf value) { - if (!done.get()) { - counter.addAndGet(counterDelta); - streamingCall.request(1); + StreamObserver obs = requestObserverRef.get(); + if (done.get()) { + if (!ignoreMessages.getAndSet(true)) { + obs.onCompleted(); + } + return; } + if (record.get()) { + counter.addAndGet(counterDelta); + } + // request is called automatically because the observer implicitly has auto + // inbound flow control } @Override public void onError(Throwable t) { - done.set(true); + logger.log(Level.WARNING, "call error", t); + latch.countDown(); } @Override public void onCompleted() { + latch.countDown(); } }); requestObserverRef.set(requestObserver); + + // Add some outstanding requests to ensure the server is filling the connection + streamingCall.request(5); requestObserver.onNext(request.slice()); } } + return latch; } /** * Shutdown all the client channels and then shutdown the server. */ protected void teardown() throws Exception { + logger.fine("shutting down channels"); for (ManagedChannel channel : channels) { channel.shutdown(); } - server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + logger.fine("shutting down server"); + server.shutdown(); + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown server"); + } + logger.fine("server shut down"); + for (ManagedChannel channel : channels) { + if (!channel.awaitTermination(1, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown client"); + } + } + logger.fine("channels shut down"); } - } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java index 08ee906fb0..556334f675 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java @@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; /** * Benchmark measuring messages per second received from a streaming server. The server @@ -52,6 +55,9 @@ import java.util.concurrent.atomic.AtomicLong; @Fork(1) public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark { + private static final Logger logger = + Logger.getLogger(FlowControlledMessagesPerSecondBenchmark.class.getName()); + @Param({"1", "2", "4"}) public int channelCount = 1; @@ -66,6 +72,8 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -100,7 +108,9 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark channelCount); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + record = new AtomicBoolean(); + latch = + startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); } /** @@ -110,7 +120,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown all calls."); + } + super.teardown(); } @@ -120,8 +133,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark */ @Benchmark public void stream(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /** diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java index e81ec49e9a..9dceb6c10e 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java @@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; /** * Benchmark measuring messages per second using a set of permanently open duplex streams which @@ -51,6 +54,8 @@ import java.util.concurrent.atomic.AtomicLong; @State(Scope.Benchmark) @Fork(1) public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { + private static final Logger logger = + Logger.getLogger(StreamingPingPongsPerSecondBenchmark.class.getName()); @Param({"1", "2", "4", "8"}) public int channelCount = 1; @@ -60,6 +65,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -94,7 +101,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { channelCount); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + record = new AtomicBoolean(); + latch = startStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); } /** @@ -104,7 +112,9 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown all calls."); + } super.teardown(); } @@ -114,8 +124,10 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { */ @Benchmark public void pingPong(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /** diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java index 0faf894fc3..d7de0a452e 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java @@ -41,6 +41,8 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -64,6 +66,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -98,7 +102,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { 1); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, + record = new AtomicBoolean(); + latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, responseSize.bytes()); } @@ -109,7 +114,9 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + System.err.println("Failed to shutdown all calls."); + } super.teardown(); } @@ -118,8 +125,10 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { */ @Benchmark public void stream(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /**