Allow use of a LocalChannel with Netty & HTTP2

Remove old in-process handling
Update tests and benchmarks

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81381986
This commit is contained in:
lryan 2014-12-04 16:43:45 -08:00 committed by Eric Anderson
parent 9dd0c944c8
commit 7f55e8163a
8 changed files with 79 additions and 267 deletions

View File

@ -1,7 +1,6 @@
package com.google.net.stubby.transport.netty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -10,7 +9,6 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
@ -58,7 +56,7 @@ public class Http2Negotiator {
/**
* Gets the {@link ChannelInitializer} for negotiating the protocol.
*/
ChannelInitializer<SocketChannel> initializer();
ChannelInitializer<Channel> initializer();
void onConnected(Channel channel);
@ -90,9 +88,9 @@ public class Http2Negotiator {
if (!installJettyTLSProtocolSelection(sslEngine, completeFuture, false)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
}
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
@Override
public void initChannel(final SocketChannel ch) throws Exception {
public void initChannel(final Channel ch) throws Exception {
SslHandler sslHandler = new SslHandler(sslEngine, false);
sslHandler.handshakeFuture().addListener(
new GenericFutureListener<Future<? super Channel>>() {
@ -112,7 +110,7 @@ public class Http2Negotiator {
return new Negotiation() {
@Override
public ChannelInitializer<SocketChannel> initializer() {
public ChannelInitializer<Channel> initializer() {
return initializer;
}
@ -138,9 +136,9 @@ public class Http2Negotiator {
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler();
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(upgrader);
ch.pipeline().addLast(completionHandler);
}
@ -148,7 +146,7 @@ public class Http2Negotiator {
return new Negotiation() {
@Override
public ChannelInitializer<SocketChannel> initializer() {
public ChannelInitializer<Channel> initializer() {
return initializer;
}
@ -172,16 +170,16 @@ public class Http2Negotiator {
* Create a "no-op" negotiation that simply assumes the protocol to already be negotiated.
*/
public static Negotiation plaintext(final ChannelHandler handler) {
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(handler);
}
};
return new Negotiation() {
private final SettableFuture<Void> completeFuture = SettableFuture.create();
@Override
public ChannelInitializer<SocketChannel> initializer() {
public ChannelInitializer<Channel> initializer() {
return initializer;
}

View File

@ -9,13 +9,14 @@ import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* Convenient class for building channels with the netty transport.
*/
public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChannelBuilder> {
private final InetSocketAddress serverAddress;
private final SocketAddress serverAddress;
private NegotiationType negotiationType = NegotiationType.TLS;
private EventLoopGroup userEventLoopGroup;
@ -24,7 +25,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
/**
* Creates a new builder with the given server address.
*/
public static NettyChannelBuilder forAddress(InetSocketAddress serverAddress) {
public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
return new NettyChannelBuilder(serverAddress);
}
@ -35,7 +36,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
return forAddress(new InetSocketAddress(host, port));
}
private NettyChannelBuilder(InetSocketAddress serverAddress) {
private NettyChannelBuilder(SocketAddress serverAddress) {
this.serverAddress = serverAddress;
}

View File

@ -18,6 +18,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
@ -38,6 +40,7 @@ import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLogLevel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ExecutionException;
import javax.net.ssl.SSLEngine;
@ -49,7 +52,7 @@ import javax.net.ssl.SSLParameters;
*/
class NettyClientTransport extends AbstractClientTransport {
private final InetSocketAddress address;
private final SocketAddress address;
private final EventLoopGroup eventGroup;
private final Http2Negotiator.Negotiation negotiation;
private final NettyClientHandler handler;
@ -57,13 +60,23 @@ class NettyClientTransport extends AbstractClientTransport {
private final AsciiString authority;
private Channel channel;
NettyClientTransport(InetSocketAddress address, NegotiationType negotiationType,
NettyClientTransport(SocketAddress address, NegotiationType negotiationType,
EventLoopGroup eventGroup, SslContext sslContext) {
Preconditions.checkNotNull(negotiationType, "negotiationType");
this.address = Preconditions.checkNotNull(address, "address");
this.eventGroup = Preconditions.checkNotNull(eventGroup, "eventGroup");
authority = new AsciiString(address.getHostString() + ":" + address.getPort());
InetSocketAddress inetAddress = null;
if (address instanceof InetSocketAddress) {
inetAddress = (InetSocketAddress) address;
authority = new AsciiString(inetAddress.getHostString() + ":" + inetAddress.getPort());
} else if (address instanceof LocalAddress) {
authority = new AsciiString(address.toString());
Preconditions.checkArgument(negotiationType != NegotiationType.TLS,
"TLS not supported for in-process transport");
} else {
throw new IllegalStateException("Unknown socket address type " + address.toString());
}
handler = newHandler();
switch (negotiationType) {
@ -85,7 +98,7 @@ class NettyClientTransport extends AbstractClientTransport {
}
// TODO(user): specify allocator. The method currently ignores it though.
SSLEngine sslEngine
= sslContext.newEngine(null, address.getHostString(), address.getPort());
= sslContext.newEngine(null, inetAddress.getHostString(), inetAddress.getPort());
SSLParameters sslParams = new SSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParams);
@ -127,8 +140,12 @@ class NettyClientTransport extends AbstractClientTransport {
protected void doStart() {
Bootstrap b = new Bootstrap();
b.group(eventGroup);
b.channel(NioSocketChannel.class);
b.option(SO_KEEPALIVE, true);
if (address instanceof LocalAddress) {
b.channel(LocalChannel.class);
} else {
b.channel(NioSocketChannel.class);
b.option(SO_KEEPALIVE, true);
}
b.handler(negotiation.initializer());
// Start the connection operation to the server.

View File

@ -6,19 +6,19 @@ import com.google.net.stubby.transport.ClientTransportFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
/**
* Factory that manufactures instances of {@link NettyClientTransport}.
*/
class NettyClientTransportFactory implements ClientTransportFactory {
private final InetSocketAddress address;
private final SocketAddress address;
private final NegotiationType negotiationType;
private final EventLoopGroup group;
private final SslContext sslContext;
public NettyClientTransportFactory(InetSocketAddress address, NegotiationType negotiationType,
public NettyClientTransportFactory(SocketAddress address, NegotiationType negotiationType,
EventLoopGroup group, SslContext sslContext) {
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");

View File

@ -7,13 +7,16 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.transport.ServerListener;
import java.net.SocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
@ -24,26 +27,26 @@ import javax.annotation.Nullable;
* Netty-based server.
*/
public class NettyServer extends AbstractService {
private final int port;
private final ChannelInitializer<SocketChannel> channelInitializer;
private final SocketAddress address;
private final ChannelInitializer<Channel> channelInitializer;
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private Channel channel;
public NettyServer(ServerListener serverListener, int port, EventLoopGroup bossGroup,
public NettyServer(ServerListener serverListener, SocketAddress address, EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
this(serverListener, port, bossGroup, workerGroup, null);
this(serverListener, address, bossGroup, workerGroup, null);
}
public NettyServer(final ServerListener serverListener, int port, EventLoopGroup bossGroup,
public NettyServer(final ServerListener serverListener, SocketAddress address,
EventLoopGroup bossGroup,
EventLoopGroup workerGroup, @Nullable final SslContext sslContext) {
Preconditions.checkNotNull(bossGroup, "bossGroup");
Preconditions.checkNotNull(workerGroup, "workerGroup");
Preconditions.checkArgument(port >= 0, "port must be positive");
this.port = port;
this.channelInitializer = new ChannelInitializer<SocketChannel>() {
this.address = address;
this.channelInitializer = new ChannelInitializer<Channel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(Channel ch) throws Exception {
NettyServerTransport transport = new NettyServerTransport(ch, serverListener, sslContext);
transport.startAsync();
// TODO(user): Should we wait for transport shutdown before shutting down server?
@ -57,13 +60,17 @@ public class NettyServer extends AbstractService {
protected void doStart() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
if (address instanceof LocalAddress) {
b.channel(LocalServerChannel.class);
} else {
b.channel(NioServerSocketChannel.class);
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}
b.childHandler(channelInitializer);
// Bind and start to accept incoming connections.
b.bind(port).addListener(new ChannelFutureListener() {
b.bind(address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {

View File

@ -7,6 +7,9 @@ import com.google.net.stubby.HandlerRegistry;
import com.google.net.stubby.SharedResourceHolder;
import com.google.net.stubby.transport.ServerListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.ssl.SslContext;
@ -15,7 +18,7 @@ import io.netty.handler.ssl.SslContext;
*/
public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
private final int port;
private final SocketAddress address;
private EventLoopGroup userBossEventLoopGroup;
private EventLoopGroup userWorkerEventLoopGroup;
@ -29,13 +32,21 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
return new NettyServerBuilder(registry, port);
}
public static NettyServerBuilder forAddress(SocketAddress address) {
return new NettyServerBuilder(address);
}
private NettyServerBuilder(int port) {
this.port = port;
this.address = new InetSocketAddress(port);
}
private NettyServerBuilder(HandlerRegistry registry, int port) {
super(registry);
this.port = port;
this.address = new InetSocketAddress(port);
}
private NettyServerBuilder(SocketAddress address) {
this.address = address;
}
/**
@ -81,7 +92,7 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
: userWorkerEventLoopGroup;
NettyServer server = new NettyServer(serverListener, port, bossEventLoopGroup,
NettyServer server = new NettyServer(serverListener, address, bossEventLoopGroup,
workerEventLoopGroup, sslContext);
if (userBossEventLoopGroup == null) {
server.addListener(new ClosureHook() {

View File

@ -5,9 +5,9 @@ import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.transport.ServerListener;
import com.google.net.stubby.transport.ServerTransportListener;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
@ -31,12 +31,12 @@ import javax.annotation.Nullable;
*/
class NettyServerTransport extends AbstractService {
private static final Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
private final SocketChannel channel;
private final Channel channel;
private final ServerListener serverListener;
private final SslContext sslContext;
private NettyServerHandler handler;
NettyServerTransport(SocketChannel channel, ServerListener serverListener,
NettyServerTransport(Channel channel, ServerListener serverListener,
@Nullable SslContext sslContext) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.serverListener = Preconditions.checkNotNull(serverListener, "serverListener");

View File

@ -1,222 +0,0 @@
package com.google.net.stubby.testing;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.HandlerRegistry;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.SerializingExecutor;
import com.google.net.stubby.ServerCall;
import com.google.net.stubby.ServerMethodDefinition;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.ClientStream;
import com.google.net.stubby.transport.ClientStreamListener;
import com.google.net.stubby.transport.ClientTransport;
import com.google.net.stubby.transport.ClientTransportFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
/**
* Utility functions for binding clients to in-process services.
*/
public class InProcessUtils {
/**
* Create a {@link ClientTransportFactory} connected to the given
* {@link com.google.net.stubby.HandlerRegistry}
*/
public static ClientTransportFactory adaptHandlerRegistry(HandlerRegistry handlers,
ExecutorService executor) {
final ClientTransport transport = new InProcessClientTransport(handlers, executor);
return new ClientTransportFactory() {
@Override
public ClientTransport newClientTransport() {
return transport;
}
};
}
private InProcessUtils() {
}
/**
* Implementation of ClientTransport that delegates to a
* {@link com.google.net.stubby.ServerCall.Listener}
*/
private static class InProcessClientTransport extends AbstractService
implements ClientTransport {
private final HandlerRegistry handlers;
private final ExecutorService executor;
public InProcessClientTransport(HandlerRegistry handlers, ExecutorService executor) {
this.handlers = handlers;
this.executor = executor;
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
public void doStop() {
notifyStopped();
}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method,
final Metadata.Headers headers,
final ClientStreamListener clientListener) {
// Separate FIFO executor queues for work on the client and server
final SerializingExecutor serverWorkQueue = new SerializingExecutor(executor);
final SerializingExecutor clientWorkQueue = new SerializingExecutor(executor);
final HandlerRegistry.Method resolvedMethod = handlers.lookupMethod("/" + method.getName());
if (resolvedMethod == null) {
// Threading?
clientWorkQueue.execute(new Runnable() {
@Override
public void run() {
clientListener.closed(Status.UNIMPLEMENTED, new Metadata.Trailers());
}
});
return new NoOpClientStream();
}
@SuppressWarnings("rawtypes")
final ServerMethodDefinition serverMethod = resolvedMethod.getMethodDefinition();
final AtomicBoolean cancelled = new AtomicBoolean();
// Implementation of ServerCall which delegates to the client listener.
@SuppressWarnings("rawtypes")
final ServerCall serverCall = new ServerCall() {
@Override
public void sendHeaders(final Metadata.Headers headers) {
clientWorkQueue.execute(new Runnable() {
@Override
public void run() {
clientListener.headersRead(headers);
}
});
}
@Override
public void sendPayload(final Object payload) {
clientWorkQueue.execute(new Runnable() {
@Override
public void run() {
try {
// TODO(user): Consider adapting at the Channel layer on the client
// so we avoid serialization costs.
@SuppressWarnings("unchecked")
InputStream message = serverMethod.streamResponse(payload);
clientListener.messageRead(message, message.available());
} catch (IOException ioe) {
close(Status.fromThrowable(ioe), new Metadata.Trailers());
}
}
});
}
@Override
public void close(final Status status, final Metadata.Trailers trailers) {
clientWorkQueue.execute(new Runnable() {
@Override
public void run() {
clientListener.closed(status, trailers);
}
});
}
@Override
public boolean isCancelled() {
return cancelled.get();
}
};
// Get the listener from the service implementation
@SuppressWarnings({"rawtypes", "unchecked"})
final ServerCall.Listener serverListener =
serverMethod.getServerCallHandler().startCall(method.getName(),
serverCall, headers);
// Return implementation of ClientStream which delegates to the server listener.
return new ClientStream() {
@Override
public void cancel() {
cancelled.set(true);
serverWorkQueue.execute(new Runnable() {
@Override
public void run() {
serverListener.onCancel();
}
});
}
@Override
public void halfClose() {
serverWorkQueue.execute(new Runnable() {
@Override
public void run() {
serverListener.onHalfClose();
}
});
}
@Override
public void writeMessage(final InputStream message, int length,
@Nullable final Runnable accepted) {
serverWorkQueue.execute(new Runnable() {
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
serverListener.onPayload(serverMethod.parseRequest(message));
} catch (RuntimeException re) {
serverCall.close(Status.fromThrowable(re), new Metadata.Trailers());
} finally {
if (accepted != null) {
accepted.run();
}
}
}
});
}
@Override
public void flush() {
// No-op
}
};
}
// Simple No-Op implementation of ClientStream
private static class NoOpClientStream implements ClientStream {
@Override
public void cancel() {
// No-op
}
@Override
public void halfClose() {
// No-op
}
@Override
public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
}
@Override
public void flush() {
}
}
}
}