diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index c458c41624..8704e889c0 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -65,17 +65,22 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Abstract base class for Netty end-to-end benchmarks. */ public abstract class AbstractBenchmark { + private static final Logger logger = Logger.getLogger(AbstractBenchmark.class.getName()); + /** * Standard message sizes. */ @@ -429,35 +434,44 @@ public abstract class AbstractBenchmark { * {@code done.get()} is true. Each completed call will increment the counter by the specified * delta which benchmarks can use to measure messages per second or bandwidth. */ - protected void startStreamingCalls(int callsPerChannel, - final AtomicLong counter, - final AtomicBoolean done, - final long counterDelta) { + protected CountDownLatch startStreamingCalls(int callsPerChannel, final AtomicLong counter, + final AtomicBoolean record, final AtomicBoolean done, final long counterDelta) { + final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length); for (final ManagedChannel channel : channels) { for (int i = 0; i < callsPerChannel; i++) { final ClientCall streamingCall = channel.newCall(pingPongMethod, CALL_OPTIONS); final AtomicReference> requestObserverRef = new AtomicReference>(); + final AtomicBoolean ignoreMessages = new AtomicBoolean(); StreamObserver requestObserver = ClientCalls.asyncBidiStreamingCall( streamingCall, new StreamObserver() { @Override public void onNext(ByteBuf value) { - if (!done.get()) { - counter.addAndGet(counterDelta); - requestObserverRef.get().onNext(request.slice()); - streamingCall.request(1); + if (done.get()) { + if (!ignoreMessages.getAndSet(true)) { + requestObserverRef.get().onCompleted(); + } + return; } + requestObserverRef.get().onNext(request.slice()); + if (record.get()) { + counter.addAndGet(counterDelta); + } + // request is called automatically because the observer implicitly has auto + // inbound flow control } @Override public void onError(Throwable t) { - done.set(true); + logger.log(Level.WARNING, "call error", t); + latch.countDown(); } @Override public void onCompleted() { + latch.countDown(); } }); requestObserverRef.set(requestObserver); @@ -465,6 +479,7 @@ public abstract class AbstractBenchmark { requestObserver.onNext(request.slice()); } } + return latch; } /** @@ -472,50 +487,76 @@ public abstract class AbstractBenchmark { * {@code done.get()} is true. Each completed call will increment the counter by the specified * delta which benchmarks can use to measure messages per second or bandwidth. */ - protected void startFlowControlledStreamingCalls(int callsPerChannel, - final AtomicLong counter, - final AtomicBoolean done, - final long counterDelta) { + protected CountDownLatch startFlowControlledStreamingCalls(int callsPerChannel, + final AtomicLong counter, final AtomicBoolean record, final AtomicBoolean done, + final long counterDelta) { + final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length); for (final ManagedChannel channel : channels) { for (int i = 0; i < callsPerChannel; i++) { final ClientCall streamingCall = channel.newCall(flowControlledStreaming, CALL_OPTIONS); final AtomicReference> requestObserverRef = new AtomicReference>(); + final AtomicBoolean ignoreMessages = new AtomicBoolean(); StreamObserver requestObserver = ClientCalls.asyncBidiStreamingCall( streamingCall, new StreamObserver() { @Override public void onNext(ByteBuf value) { - if (!done.get()) { - counter.addAndGet(counterDelta); - streamingCall.request(1); + StreamObserver obs = requestObserverRef.get(); + if (done.get()) { + if (!ignoreMessages.getAndSet(true)) { + obs.onCompleted(); + } + return; } + if (record.get()) { + counter.addAndGet(counterDelta); + } + // request is called automatically because the observer implicitly has auto + // inbound flow control } @Override public void onError(Throwable t) { - done.set(true); + logger.log(Level.WARNING, "call error", t); + latch.countDown(); } @Override public void onCompleted() { + latch.countDown(); } }); requestObserverRef.set(requestObserver); + + // Add some outstanding requests to ensure the server is filling the connection + streamingCall.request(5); requestObserver.onNext(request.slice()); } } + return latch; } /** * Shutdown all the client channels and then shutdown the server. */ protected void teardown() throws Exception { + logger.fine("shutting down channels"); for (ManagedChannel channel : channels) { channel.shutdown(); } - server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + logger.fine("shutting down server"); + server.shutdown(); + if (!server.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown server"); + } + logger.fine("server shut down"); + for (ManagedChannel channel : channels) { + if (!channel.awaitTermination(1, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown client"); + } + } + logger.fine("channels shut down"); } - } diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java index 08ee906fb0..556334f675 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/FlowControlledMessagesPerSecondBenchmark.java @@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; /** * Benchmark measuring messages per second received from a streaming server. The server @@ -52,6 +55,9 @@ import java.util.concurrent.atomic.AtomicLong; @Fork(1) public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark { + private static final Logger logger = + Logger.getLogger(FlowControlledMessagesPerSecondBenchmark.class.getName()); + @Param({"1", "2", "4"}) public int channelCount = 1; @@ -66,6 +72,8 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -100,7 +108,9 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark channelCount); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + record = new AtomicBoolean(); + latch = + startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); } /** @@ -110,7 +120,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown all calls."); + } + super.teardown(); } @@ -120,8 +133,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark */ @Benchmark public void stream(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /** diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java index e81ec49e9a..9dceb6c10e 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingPingPongsPerSecondBenchmark.java @@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; /** * Benchmark measuring messages per second using a set of permanently open duplex streams which @@ -51,6 +54,8 @@ import java.util.concurrent.atomic.AtomicLong; @State(Scope.Benchmark) @Fork(1) public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { + private static final Logger logger = + Logger.getLogger(StreamingPingPongsPerSecondBenchmark.class.getName()); @Param({"1", "2", "4", "8"}) public int channelCount = 1; @@ -60,6 +65,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -94,7 +101,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { channelCount); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startStreamingCalls(maxConcurrentStreams, callCounter, completed, 1); + record = new AtomicBoolean(); + latch = startStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1); } /** @@ -104,7 +112,9 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + logger.warning("Failed to shutdown all calls."); + } super.teardown(); } @@ -114,8 +124,10 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark { */ @Benchmark public void pingPong(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /** diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java index 0faf894fc3..d7de0a452e 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/StreamingResponseBandwidthBenchmark.java @@ -41,6 +41,8 @@ import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -64,6 +66,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { private static AtomicLong callCounter; private AtomicBoolean completed; + private AtomicBoolean record; + private CountDownLatch latch; /** * Use an AuxCounter so we can measure that calls as they occur without consuming CPU @@ -98,7 +102,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { 1); callCounter = new AtomicLong(); completed = new AtomicBoolean(); - startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, + record = new AtomicBoolean(); + latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, responseSize.bytes()); } @@ -109,7 +114,9 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { @TearDown(Level.Trial) public void teardown() throws Exception { completed.set(true); - Thread.sleep(5000); + if (!latch.await(5, TimeUnit.SECONDS)) { + System.err.println("Failed to shutdown all calls."); + } super.teardown(); } @@ -118,8 +125,10 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark { */ @Benchmark public void stream(AdditionalCounters counters) throws Exception { + record.set(true); // No need to do anything, just sleep here. Thread.sleep(1001); + record.set(false); } /**