benchmarks: improve benchmarks recording and shutdown

The benchmarks today do not have a good way to record metrics with precision
or shutdown safely when the benchmark is over.  This change alters the
AbstractBenchmark class to return a latch that can be waited upon when ending
the benchmark.

Benchmarks also would accidentally request way too many messages from the
server by calling request(1) explicitly in addition to the implicit one
in the StreamObserver to Call adapter.  This change adds a few outstanding
requests, but otherwise keeps the request count bounded.

Additionally, benchmark calls would ignore errors, and just shutdown in such
cases.  This changes them to log the error and just wait for the benchmark to
complete.  In the successful case, the benchmark client notifies server by
halfClosing (via onCompleted) where it previously did not.  It is also
careful to only do this once.

Lastly, Benchmarks have been changes to enable and disable recording at exact
points in the benchmark method, rather than waiting for teardown to occur.
Also, recording begins inside the recording method, not in Setup.  JMH may
do other procressing before, between, and after iterations.
This commit is contained in:
Carl Mastrangelo 2016-07-22 11:16:42 -07:00
parent e276359f0e
commit bfd12e4a86
4 changed files with 102 additions and 25 deletions

View File

@ -65,17 +65,22 @@ import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Abstract base class for Netty end-to-end benchmarks.
*/
public abstract class AbstractBenchmark {
private static final Logger logger = Logger.getLogger(AbstractBenchmark.class.getName());
/**
* Standard message sizes.
*/
@ -429,35 +434,44 @@ public abstract class AbstractBenchmark {
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected void startStreamingCalls(int callsPerChannel,
final AtomicLong counter,
final AtomicBoolean done,
final long counterDelta) {
protected CountDownLatch startStreamingCalls(int callsPerChannel, final AtomicLong counter,
final AtomicBoolean record, final AtomicBoolean done, final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(pingPongMethod, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
requestObserverRef.get().onNext(request.slice());
streamingCall.request(1);
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
requestObserverRef.get().onCompleted();
}
return;
}
requestObserverRef.get().onNext(request.slice());
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
done.set(true);
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
@ -465,6 +479,7 @@ public abstract class AbstractBenchmark {
requestObserver.onNext(request.slice());
}
}
return latch;
}
/**
@ -472,50 +487,76 @@ public abstract class AbstractBenchmark {
* {@code done.get()} is true. Each completed call will increment the counter by the specified
* delta which benchmarks can use to measure messages per second or bandwidth.
*/
protected void startFlowControlledStreamingCalls(int callsPerChannel,
final AtomicLong counter,
final AtomicBoolean done,
final long counterDelta) {
protected CountDownLatch startFlowControlledStreamingCalls(int callsPerChannel,
final AtomicLong counter, final AtomicBoolean record, final AtomicBoolean done,
final long counterDelta) {
final CountDownLatch latch = new CountDownLatch(callsPerChannel * channels.length);
for (final ManagedChannel channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final ClientCall<ByteBuf, ByteBuf> streamingCall =
channel.newCall(flowControlledStreaming, CALL_OPTIONS);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
final AtomicBoolean ignoreMessages = new AtomicBoolean();
StreamObserver<ByteBuf> requestObserver = ClientCalls.asyncBidiStreamingCall(
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onNext(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
streamingCall.request(1);
StreamObserver<ByteBuf> obs = requestObserverRef.get();
if (done.get()) {
if (!ignoreMessages.getAndSet(true)) {
obs.onCompleted();
}
return;
}
if (record.get()) {
counter.addAndGet(counterDelta);
}
// request is called automatically because the observer implicitly has auto
// inbound flow control
}
@Override
public void onError(Throwable t) {
done.set(true);
logger.log(Level.WARNING, "call error", t);
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
requestObserverRef.set(requestObserver);
// Add some outstanding requests to ensure the server is filling the connection
streamingCall.request(5);
requestObserver.onNext(request.slice());
}
}
return latch;
}
/**
* Shutdown all the client channels and then shutdown the server.
*/
protected void teardown() throws Exception {
logger.fine("shutting down channels");
for (ManagedChannel channel : channels) {
channel.shutdown();
}
server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
logger.fine("shutting down server");
server.shutdown();
if (!server.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warning("Failed to shutdown server");
}
logger.fine("server shut down");
for (ManagedChannel channel : channels) {
if (!channel.awaitTermination(1, TimeUnit.SECONDS)) {
logger.warning("Failed to shutdown client");
}
}
logger.fine("channels shut down");
}
}

View File

@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
/**
* Benchmark measuring messages per second received from a streaming server. The server
@ -52,6 +55,9 @@ import java.util.concurrent.atomic.AtomicLong;
@Fork(1)
public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark {
private static final Logger logger =
Logger.getLogger(FlowControlledMessagesPerSecondBenchmark.class.getName());
@Param({"1", "2", "4"})
public int channelCount = 1;
@ -66,6 +72,8 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
private static AtomicLong callCounter;
private AtomicBoolean completed;
private AtomicBoolean record;
private CountDownLatch latch;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
@ -100,7 +108,9 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
channelCount);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed, 1);
record = new AtomicBoolean();
latch =
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1);
}
/**
@ -110,7 +120,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
if (!latch.await(5, TimeUnit.SECONDS)) {
logger.warning("Failed to shutdown all calls.");
}
super.teardown();
}
@ -120,8 +133,10 @@ public class FlowControlledMessagesPerSecondBenchmark extends AbstractBenchmark
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
record.set(true);
// No need to do anything, just sleep here.
Thread.sleep(1001);
record.set(false);
}
/**

View File

@ -41,8 +41,11 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
/**
* Benchmark measuring messages per second using a set of permanently open duplex streams which
@ -51,6 +54,8 @@ import java.util.concurrent.atomic.AtomicLong;
@State(Scope.Benchmark)
@Fork(1)
public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
private static final Logger logger =
Logger.getLogger(StreamingPingPongsPerSecondBenchmark.class.getName());
@Param({"1", "2", "4", "8"})
public int channelCount = 1;
@ -60,6 +65,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
private static AtomicLong callCounter;
private AtomicBoolean completed;
private AtomicBoolean record;
private CountDownLatch latch;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
@ -94,7 +101,8 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
channelCount);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startStreamingCalls(maxConcurrentStreams, callCounter, completed, 1);
record = new AtomicBoolean();
latch = startStreamingCalls(maxConcurrentStreams, callCounter, record, completed, 1);
}
/**
@ -104,7 +112,9 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
if (!latch.await(5, TimeUnit.SECONDS)) {
logger.warning("Failed to shutdown all calls.");
}
super.teardown();
}
@ -114,8 +124,10 @@ public class StreamingPingPongsPerSecondBenchmark extends AbstractBenchmark {
*/
@Benchmark
public void pingPong(AdditionalCounters counters) throws Exception {
record.set(true);
// No need to do anything, just sleep here.
Thread.sleep(1001);
record.set(false);
}
/**

View File

@ -41,6 +41,8 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -64,6 +66,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
private static AtomicLong callCounter;
private AtomicBoolean completed;
private AtomicBoolean record;
private CountDownLatch latch;
/**
* Use an AuxCounter so we can measure that calls as they occur without consuming CPU
@ -98,7 +102,8 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
1);
callCounter = new AtomicLong();
completed = new AtomicBoolean();
startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, completed,
record = new AtomicBoolean();
latch = startFlowControlledStreamingCalls(maxConcurrentStreams, callCounter, record, completed,
responseSize.bytes());
}
@ -109,7 +114,9 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
@TearDown(Level.Trial)
public void teardown() throws Exception {
completed.set(true);
Thread.sleep(5000);
if (!latch.await(5, TimeUnit.SECONDS)) {
System.err.println("Failed to shutdown all calls.");
}
super.teardown();
}
@ -118,8 +125,10 @@ public class StreamingResponseBandwidthBenchmark extends AbstractBenchmark {
*/
@Benchmark
public void stream(AdditionalCounters counters) throws Exception {
record.set(true);
// No need to do anything, just sleep here.
Thread.sleep(1001);
record.set(false);
}
/**