From 7f55e8163a42a426a141fb11cd9ce8ed9b5b0d2e Mon Sep 17 00:00:00 2001 From: lryan Date: Thu, 4 Dec 2014 16:43:45 -0800 Subject: [PATCH] Allow use of a LocalChannel with Netty & HTTP2 Remove old in-process handling Update tests and benchmarks ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81381986 --- .../transport/netty/Http2Negotiator.java | 22 +- .../transport/netty/NettyChannelBuilder.java | 7 +- .../transport/netty/NettyClientTransport.java | 29 ++- .../netty/NettyClientTransportFactory.java | 6 +- .../stubby/transport/netty/NettyServer.java | 35 +-- .../transport/netty/NettyServerBuilder.java | 19 +- .../transport/netty/NettyServerTransport.java | 6 +- .../net/stubby/testing/InProcessUtils.java | 222 ------------------ 8 files changed, 79 insertions(+), 267 deletions(-) delete mode 100644 testing/src/main/java/com/google/net/stubby/testing/InProcessUtils.java diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/Http2Negotiator.java b/netty/src/main/java/com/google/net/stubby/transport/netty/Http2Negotiator.java index 7d9bdea877..e41e128fde 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/Http2Negotiator.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/Http2Negotiator.java @@ -1,7 +1,6 @@ package com.google.net.stubby.transport.netty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -10,7 +9,6 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientUpgradeHandler; @@ -58,7 +56,7 @@ public class Http2Negotiator { /** * Gets the {@link ChannelInitializer} for negotiating the protocol. */ - ChannelInitializer initializer(); + ChannelInitializer initializer(); void onConnected(Channel channel); @@ -90,9 +88,9 @@ public class Http2Negotiator { if (!installJettyTLSProtocolSelection(sslEngine, completeFuture, false)) { throw new IllegalStateException("NPN/ALPN extensions not installed"); } - final ChannelInitializer initializer = new ChannelInitializer() { + final ChannelInitializer initializer = new ChannelInitializer() { @Override - public void initChannel(final SocketChannel ch) throws Exception { + public void initChannel(final Channel ch) throws Exception { SslHandler sslHandler = new SslHandler(sslEngine, false); sslHandler.handshakeFuture().addListener( new GenericFutureListener>() { @@ -112,7 +110,7 @@ public class Http2Negotiator { return new Negotiation() { @Override - public ChannelInitializer initializer() { + public ChannelInitializer initializer() { return initializer; } @@ -138,9 +136,9 @@ public class Http2Negotiator { final HttpClientUpgradeHandler upgrader = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler(); - final ChannelInitializer initializer = new ChannelInitializer() { + final ChannelInitializer initializer = new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(upgrader); ch.pipeline().addLast(completionHandler); } @@ -148,7 +146,7 @@ public class Http2Negotiator { return new Negotiation() { @Override - public ChannelInitializer initializer() { + public ChannelInitializer initializer() { return initializer; } @@ -172,16 +170,16 @@ public class Http2Negotiator { * Create a "no-op" negotiation that simply assumes the protocol to already be negotiated. */ public static Negotiation plaintext(final ChannelHandler handler) { - final ChannelInitializer initializer = new ChannelInitializer() { + final ChannelInitializer initializer = new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(handler); } }; return new Negotiation() { private final SettableFuture completeFuture = SettableFuture.create(); @Override - public ChannelInitializer initializer() { + public ChannelInitializer initializer() { return initializer; } diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyChannelBuilder.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyChannelBuilder.java index a7b8527f5d..06bdc6e4d9 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyChannelBuilder.java @@ -9,13 +9,14 @@ import io.netty.channel.EventLoopGroup; import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; +import java.net.SocketAddress; /** * Convenient class for building channels with the netty transport. */ public final class NettyChannelBuilder extends AbstractChannelBuilder { - private final InetSocketAddress serverAddress; + private final SocketAddress serverAddress; private NegotiationType negotiationType = NegotiationType.TLS; private EventLoopGroup userEventLoopGroup; @@ -24,7 +25,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder channelInitializer; + private final SocketAddress address; + private final ChannelInitializer channelInitializer; private final EventLoopGroup bossGroup; private final EventLoopGroup workerGroup; private Channel channel; - public NettyServer(ServerListener serverListener, int port, EventLoopGroup bossGroup, + public NettyServer(ServerListener serverListener, SocketAddress address, EventLoopGroup bossGroup, EventLoopGroup workerGroup) { - this(serverListener, port, bossGroup, workerGroup, null); + this(serverListener, address, bossGroup, workerGroup, null); } - public NettyServer(final ServerListener serverListener, int port, EventLoopGroup bossGroup, + public NettyServer(final ServerListener serverListener, SocketAddress address, + EventLoopGroup bossGroup, EventLoopGroup workerGroup, @Nullable final SslContext sslContext) { Preconditions.checkNotNull(bossGroup, "bossGroup"); Preconditions.checkNotNull(workerGroup, "workerGroup"); - Preconditions.checkArgument(port >= 0, "port must be positive"); - this.port = port; - this.channelInitializer = new ChannelInitializer() { + this.address = address; + this.channelInitializer = new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(Channel ch) throws Exception { NettyServerTransport transport = new NettyServerTransport(ch, serverListener, sslContext); transport.startAsync(); // TODO(user): Should we wait for transport shutdown before shutting down server? @@ -57,13 +60,17 @@ public class NettyServer extends AbstractService { protected void doStart() { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); - b.channel(NioServerSocketChannel.class); - b.option(SO_BACKLOG, 128); - b.childOption(SO_KEEPALIVE, true); + if (address instanceof LocalAddress) { + b.channel(LocalServerChannel.class); + } else { + b.channel(NioServerSocketChannel.class); + b.option(SO_BACKLOG, 128); + b.childOption(SO_KEEPALIVE, true); + } b.childHandler(channelInitializer); // Bind and start to accept incoming connections. - b.bind(port).addListener(new ChannelFutureListener() { + b.bind(address).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java index ba61afdef6..0b49a46fc9 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java @@ -7,6 +7,9 @@ import com.google.net.stubby.HandlerRegistry; import com.google.net.stubby.SharedResourceHolder; import com.google.net.stubby.transport.ServerListener; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + import io.netty.channel.EventLoopGroup; import io.netty.handler.ssl.SslContext; @@ -15,7 +18,7 @@ import io.netty.handler.ssl.SslContext; */ public final class NettyServerBuilder extends AbstractServerBuilder { - private final int port; + private final SocketAddress address; private EventLoopGroup userBossEventLoopGroup; private EventLoopGroup userWorkerEventLoopGroup; @@ -29,13 +32,21 @@ public final class NettyServerBuilder extends AbstractServerBuilder method, - final Metadata.Headers headers, - final ClientStreamListener clientListener) { - // Separate FIFO executor queues for work on the client and server - final SerializingExecutor serverWorkQueue = new SerializingExecutor(executor); - final SerializingExecutor clientWorkQueue = new SerializingExecutor(executor); - - final HandlerRegistry.Method resolvedMethod = handlers.lookupMethod("/" + method.getName()); - if (resolvedMethod == null) { - // Threading? - clientWorkQueue.execute(new Runnable() { - @Override - public void run() { - clientListener.closed(Status.UNIMPLEMENTED, new Metadata.Trailers()); - } - }); - return new NoOpClientStream(); - } - - @SuppressWarnings("rawtypes") - final ServerMethodDefinition serverMethod = resolvedMethod.getMethodDefinition(); - final AtomicBoolean cancelled = new AtomicBoolean(); - - // Implementation of ServerCall which delegates to the client listener. - @SuppressWarnings("rawtypes") - final ServerCall serverCall = new ServerCall() { - - @Override - public void sendHeaders(final Metadata.Headers headers) { - clientWorkQueue.execute(new Runnable() { - @Override - public void run() { - clientListener.headersRead(headers); - } - }); - } - - @Override - public void sendPayload(final Object payload) { - clientWorkQueue.execute(new Runnable() { - @Override - public void run() { - try { - // TODO(user): Consider adapting at the Channel layer on the client - // so we avoid serialization costs. - @SuppressWarnings("unchecked") - InputStream message = serverMethod.streamResponse(payload); - clientListener.messageRead(message, message.available()); - } catch (IOException ioe) { - close(Status.fromThrowable(ioe), new Metadata.Trailers()); - } - } - }); - } - - @Override - public void close(final Status status, final Metadata.Trailers trailers) { - clientWorkQueue.execute(new Runnable() { - @Override - public void run() { - clientListener.closed(status, trailers); - } - }); - - } - - @Override - public boolean isCancelled() { - return cancelled.get(); - } - }; - - // Get the listener from the service implementation - @SuppressWarnings({"rawtypes", "unchecked"}) - final ServerCall.Listener serverListener = - serverMethod.getServerCallHandler().startCall(method.getName(), - serverCall, headers); - - // Return implementation of ClientStream which delegates to the server listener. - return new ClientStream() { - - @Override - public void cancel() { - cancelled.set(true); - serverWorkQueue.execute(new Runnable() { - @Override - public void run() { - serverListener.onCancel(); - } - }); - } - - @Override - public void halfClose() { - serverWorkQueue.execute(new Runnable() { - @Override - public void run() { - serverListener.onHalfClose(); - } - }); - } - - @Override - public void writeMessage(final InputStream message, int length, - @Nullable final Runnable accepted) { - serverWorkQueue.execute(new Runnable() { - @SuppressWarnings("unchecked") - @Override - public void run() { - try { - serverListener.onPayload(serverMethod.parseRequest(message)); - } catch (RuntimeException re) { - serverCall.close(Status.fromThrowable(re), new Metadata.Trailers()); - } finally { - if (accepted != null) { - accepted.run(); - } - } - } - }); - } - - @Override - public void flush() { - // No-op - } - }; - } - - // Simple No-Op implementation of ClientStream - private static class NoOpClientStream implements ClientStream { - @Override - public void cancel() { - // No-op - } - - @Override - public void halfClose() { - // No-op - } - - @Override - public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { - } - - @Override - public void flush() { - } - } - } -}