diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java index 837e2304e4..ef572dcc2c 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java @@ -31,6 +31,10 @@ package io.grpc.benchmarks.driver; +import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; + +import com.google.common.util.concurrent.UncaughtExceptionHandlers; + import com.sun.management.OperatingSystemMXBean; import io.grpc.Metadata; @@ -52,12 +56,15 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.TestUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import io.netty.util.concurrent.DefaultThreadFactory; import java.io.File; import java.lang.management.ManagementFactory; import java.util.List; -import java.util.concurrent.Executors; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -110,8 +117,7 @@ final class LoadServer { // fully async. switch (config.getServerType()) { case ASYNC_SERVER: { - serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads, - new DefaultThreadFactory("server-worker", true))); + serverBuilder.executor(getExecutor(asyncThreads)); break; } case SYNC_SERVER: { @@ -119,8 +125,7 @@ final class LoadServer { break; } case ASYNC_GENERIC_SERVER: { - serverBuilder.executor(Executors.newFixedThreadPool(asyncThreads, - new DefaultThreadFactory("server-worker", true))); + serverBuilder.executor(getExecutor(asyncThreads)); // Create buffers for the generic service PooledByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; genericResponse = alloc.buffer(config.getPayloadConfig().getBytebufParams().getRespSize()); @@ -160,6 +165,23 @@ final class LoadServer { } } + Executor getExecutor(int asyncThreads) { + // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be + // put. Move it somewhere else, or remove it if no longer necessary. + // See: https://github.com/grpc/grpc-java/issues/2119 + return new ForkJoinPool(asyncThreads, + new ForkJoinWorkerThreadFactory() { + final AtomicInteger num = new AtomicInteger(); + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); + thread.setDaemon(true); + thread.setName("server-worker-" + "-" + num.getAndIncrement()); + return thread; + } + }, UncaughtExceptionHandlers.systemExit(), true /* async */); + } + int getPort() { return port; }