diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/Utils.java b/benchmarks/src/main/java/io/grpc/benchmarks/Utils.java index 36d91ba14e..b165457224 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/Utils.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/Utils.java @@ -33,11 +33,11 @@ package io.grpc.benchmarks; import static java.util.concurrent.ForkJoinPool.defaultForkJoinWorkerThreadFactory; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.UncaughtExceptionHandlers; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.benchmarks.proto.Messages; import io.grpc.benchmarks.proto.Messages.Payload; @@ -50,7 +50,6 @@ import io.grpc.netty.NettyChannelBuilder; import io.grpc.okhttp.OkHttpChannelBuilder; import io.grpc.okhttp.internal.Platform; import io.grpc.testing.TestUtils; -import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -60,6 +59,7 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; +import io.netty.util.concurrent.DefaultThreadFactory; import org.HdrHistogram.Histogram; @@ -70,10 +70,10 @@ import java.io.PrintStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketAddress; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.ForkJoinWorkerThread; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -136,117 +136,140 @@ public final class Utils { } } - /** - * Create a {@link ManagedChannel} for the given parameters. - */ - public static ManagedChannel newClientChannel(Transport transport, SocketAddress address, - boolean tls, boolean testca, @Nullable String authorityOverride, boolean useDefaultCiphers, - int flowControlWindow, boolean directExecutor) throws IOException { - if (transport == Transport.OK_HTTP) { - InetSocketAddress addr = (InetSocketAddress) address; - OkHttpChannelBuilder builder = OkHttpChannelBuilder - .forAddress(addr.getHostName(), addr.getPort()); - if (directExecutor) { - builder.directExecutor(); - } - builder.negotiationType(tls ? io.grpc.okhttp.NegotiationType.TLS - : io.grpc.okhttp.NegotiationType.PLAINTEXT); - if (tls) { - SSLSocketFactory factory; - if (testca) { - builder.overrideAuthority( - GrpcUtil.authorityFromHostAndPort(authorityOverride, addr.getPort())); - try { - factory = TestUtils.newSslSocketFactoryForCa( - Platform.get().getProvider(), - TestUtils.loadCert("ca.pem")); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - factory = (SSLSocketFactory) SSLSocketFactory.getDefault(); + private static OkHttpChannelBuilder newOkhttpClientChannel( + SocketAddress address, boolean tls, boolean testca, @Nullable String authorityOverride) { + InetSocketAddress addr = (InetSocketAddress) address; + OkHttpChannelBuilder builder = + OkHttpChannelBuilder.forAddress(addr.getHostName(), addr.getPort()); + if (tls) { + builder.negotiationType(io.grpc.okhttp.NegotiationType.TLS); + SSLSocketFactory factory; + if (testca) { + builder.overrideAuthority( + GrpcUtil.authorityFromHostAndPort(authorityOverride, addr.getPort())); + try { + factory = TestUtils.newSslSocketFactoryForCa( + Platform.get().getProvider(), + TestUtils.loadCert("ca.pem")); + } catch (Exception e) { + throw new RuntimeException(e); } - builder.sslSocketFactory(factory); + } else { + factory = (SSLSocketFactory) SSLSocketFactory.getDefault(); } - if (authorityOverride != null) { - builder.overrideAuthority(authorityOverride); + builder.sslSocketFactory(factory); + } else { + builder.negotiationType(io.grpc.okhttp.NegotiationType.PLAINTEXT); + } + return builder; + } + + private static NettyChannelBuilder newNettyClientChannel(Transport transport, + SocketAddress address, boolean tls, boolean testca, int flowControlWindow, + boolean useDefaultCiphers) throws IOException { + NettyChannelBuilder builder = + NettyChannelBuilder.forAddress(address).flowControlWindow(flowControlWindow); + if (tls) { + builder.negotiationType(NegotiationType.TLS); + SslContext sslContext = null; + if (testca) { + File cert = TestUtils.loadCert("ca.pem"); + SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient().trustManager(cert); + if (transport == Transport.NETTY_NIO) { + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.JDK); + } else { + // Native transport with OpenSSL + sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL); + } + if (useDefaultCiphers) { + sslContextBuilder.ciphers(null); + } + sslContext = sslContextBuilder.build(); } - return builder.build(); + builder.sslContext(sslContext); + } else { + builder.negotiationType(NegotiationType.PLAINTEXT); } - // It's a Netty transport. - SslContext sslContext = null; - NegotiationType negotiationType = tls ? NegotiationType.TLS : NegotiationType.PLAINTEXT; - if (tls && testca) { - File cert = TestUtils.loadCert("ca.pem"); - SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient().trustManager(cert); - if (transport == Transport.NETTY_NIO) { - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.JDK); - } else { - // Native transport with OpenSSL - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL); - } - if (useDefaultCiphers) { - sslContextBuilder.ciphers(null); - } - sslContext = sslContextBuilder.build(); - } - final EventLoopGroup group; - final Class channelType; - ThreadFactory tf = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("ELG-%d") - .build(); + DefaultThreadFactory tf = new DefaultThreadFactory("client-elg-", true /*daemon */); switch (transport) { case NETTY_NIO: - group = new NioEventLoopGroup(0, tf); - channelType = NioSocketChannel.class; + builder + .eventLoopGroup(new NioEventLoopGroup(0, tf)) + .channelType(NioSocketChannel.class); break; case NETTY_EPOLL: // These classes only work on Linux. - group = new EpollEventLoopGroup(0, tf); - channelType = EpollSocketChannel.class; + builder + .eventLoopGroup(new EpollEventLoopGroup(0, tf)) + .channelType(EpollSocketChannel.class); break; case NETTY_UNIX_DOMAIN_SOCKET: // These classes only work on Linux. - group = new EpollEventLoopGroup(0, tf); - channelType = EpollDomainSocketChannel.class; + builder + .eventLoopGroup(new EpollEventLoopGroup(0, tf)) + .channelType(EpollDomainSocketChannel.class); break; default: // Should never get here. throw new IllegalArgumentException("Unsupported transport: " + transport); } - NettyChannelBuilder builder = NettyChannelBuilder - .forAddress(address) - .eventLoopGroup(group) - .channelType(channelType) - .negotiationType(negotiationType) - .sslContext(sslContext) - .flowControlWindow(flowControlWindow); - if (authorityOverride != null) { - builder.overrideAuthority(authorityOverride); - } - if (directExecutor) { - builder.directExecutor(); - } else { - // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be - // put. Move it somewhere else, or remove it if no longer necessary. - // See: https://github.com/grpc/grpc-java/issues/2119 - builder.executor(new ForkJoinPool(Runtime.getRuntime().availableProcessors(), + return builder; + } + + private static ExecutorService clientExecutor; + + private static synchronized ExecutorService getExecutor() { + if (clientExecutor == null) { + clientExecutor = new ForkJoinPool( + Runtime.getRuntime().availableProcessors(), new ForkJoinWorkerThreadFactory() { final AtomicInteger num = new AtomicInteger(); @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { ForkJoinWorkerThread thread = defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setDaemon(true); - thread.setName("grpc-server-app-" + "-" + num.getAndIncrement()); + thread.setName("grpc-client-app-" + "-" + num.getAndIncrement()); return thread; } - }, UncaughtExceptionHandlers.systemExit(), true /* async */)); + }, UncaughtExceptionHandlers.systemExit(), true /* async */); } + return clientExecutor; + } + + /** + * Create a {@link ManagedChannel} for the given parameters. + */ + public static ManagedChannel newClientChannel(Transport transport, SocketAddress address, + boolean tls, boolean testca, @Nullable String authorityOverride, boolean useDefaultCiphers, + int flowControlWindow, boolean directExecutor) { + ManagedChannelBuilder builder; + if (transport == Transport.OK_HTTP) { + builder = newOkhttpClientChannel(address, tls, testca, authorityOverride); + } else { + try { + builder = newNettyClientChannel( + transport, address, tls, testca, flowControlWindow, useDefaultCiphers); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (authorityOverride != null) { + builder.overrideAuthority(authorityOverride); + } + + if (directExecutor) { + builder.directExecutor(); + } else { + // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be + // put. Move it somewhere else, or remove it if no longer necessary. + // See: https://github.com/grpc/grpc-java/issues/2119 + builder.executor(getExecutor()); + } + return builder.build(); } diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java index ef572dcc2c..65f3ab4fa8 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadServer.java @@ -60,7 +60,7 @@ import io.netty.buffer.PooledByteBufAllocator; import java.io.File; import java.lang.management.ManagementFactory; import java.util.List; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory; import java.util.concurrent.ForkJoinWorkerThread; @@ -165,7 +165,7 @@ final class LoadServer { } } - Executor getExecutor(int asyncThreads) { + ExecutorService getExecutor(int asyncThreads) { // TODO(carl-mastrangelo): This should not be necessary. I don't know where this should be // put. Move it somewhere else, or remove it if no longer necessary. // See: https://github.com/grpc/grpc-java/issues/2119