diff --git a/core/src/main/java/com/google/net/stubby/ServerImpl.java b/core/src/main/java/com/google/net/stubby/ServerImpl.java index fb013fc394..102f001b24 100644 --- a/core/src/main/java/com/google/net/stubby/ServerImpl.java +++ b/core/src/main/java/com/google/net/stubby/ServerImpl.java @@ -234,7 +234,9 @@ public class ServerImpl extends AbstractService implements Server { try { HandlerRegistry.Method method = registry.lookupMethod(methodName); if (method == null) { - stream.close(Status.UNIMPLEMENTED, new Metadata.Trailers()); + stream.close( + Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), + new Metadata.Trailers()); return; } listener = startCall(stream, methodName, method.getMethodDefinition(), headers); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java index 75618fba76..8140120dae 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java @@ -54,7 +54,7 @@ public final class TransportFrameUtil { if (!path.startsWith("/")) { return null; } - return path.substring(1); + return path; } private TransportFrameUtil() {} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java index 028e8b5842..99fa8e6c71 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java @@ -8,18 +8,6 @@ import com.google.common.util.concurrent.AbstractService; import com.google.net.stubby.newtransport.ServerListener; import com.google.net.stubby.newtransport.ServerTransportListener; -import io.netty.handler.codec.http2.DefaultHttp2FrameReader; -import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; -import io.netty.handler.codec.http2.Http2Connection; -import io.netty.handler.codec.http2.Http2FrameLogger; -import io.netty.handler.codec.http2.Http2FrameReader; -import io.netty.handler.codec.http2.Http2FrameWriter; -import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFlowController; -import io.netty.handler.codec.http2.Http2OutboundFrameLogger; -import io.netty.util.internal.logging.InternalLogLevel; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -30,8 +18,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.http2.DefaultHttp2Connection; -import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy; /** * Implementation of the {@link com.google.common.util.concurrent.Service} interface for a @@ -57,9 +43,9 @@ public class NettyServer extends AbstractService { this.channelInitializer = new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - // TODO(user): pass a real transport object - ServerTransportListener transportListener = serverListener.transportCreated(null); - ch.pipeline().addLast(newHandler(transportListener)); + NettyServerTransport transport = new NettyServerTransport(); + transport.startAsync(); + transport.bind(ch, serverListener); } }; this.bossGroup = bossGroup; @@ -113,26 +99,4 @@ public class NettyServer extends AbstractService { workerGroup.shutdownGracefully(); } } - - private static NettyServerHandler newHandler(ServerTransportListener transportListener) { - Http2Connection connection = - new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy()); - Http2FrameReader frameReader = new DefaultHttp2FrameReader(); - Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - - Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG); - frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); - frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); - - DefaultHttp2InboundFlowController inboundFlow = - new DefaultHttp2InboundFlowController(connection, frameWriter); - Http2OutboundFlowController outboundFlow = - new DefaultHttp2OutboundFlowController(connection, frameWriter); - return new NettyServerHandler(transportListener, - connection, - frameReader, - frameWriter, - inboundFlow, - outboundFlow); - } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerTransport.java new file mode 100644 index 0000000000..3084c34393 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerTransport.java @@ -0,0 +1,69 @@ +package com.google.net.stubby.newtransport.netty; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractService; +import com.google.net.stubby.newtransport.ServerListener; +import com.google.net.stubby.newtransport.ServerTransportListener; + +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2FrameReader; +import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; +import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2FrameReader; +import io.netty.handler.codec.http2.Http2FrameWriter; +import io.netty.handler.codec.http2.Http2InboundFrameLogger; +import io.netty.handler.codec.http2.Http2OutboundFlowController; +import io.netty.handler.codec.http2.Http2OutboundFrameLogger; +import io.netty.util.internal.logging.InternalLogLevel; + +/** + * The Netty-based server transport. + */ +class NettyServerTransport extends AbstractService { + + NettyServerHandler handler; + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + // TODO(user): signal GO_AWAY and optionally terminate the socket after a timeout + notifyStopped(); + } + + /** + * This must be called when the transport is starting or running. + */ + void bind(SocketChannel ch, ServerListener serverListener) { + Preconditions.checkState(handler == null, "Handler already registered"); + ServerTransportListener transportListener = serverListener.transportCreated(this); + Http2Connection connection = + new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy()); + Http2FrameReader frameReader = new DefaultHttp2FrameReader(); + Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); + + Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG); + frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); + frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); + + DefaultHttp2InboundFlowController inboundFlow = + new DefaultHttp2InboundFlowController(connection, frameWriter); + Http2OutboundFlowController outboundFlow = + new DefaultHttp2OutboundFlowController(connection, frameWriter); + handler = new NettyServerHandler(transportListener, + connection, + frameReader, + frameWriter, + inboundFlow, + outboundFlow); + ch.pipeline().addLast(handler); + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java index d563c43375..3fdda84be1 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java +++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java @@ -357,7 +357,7 @@ public class Calls { public void onClose(Status status, Metadata.Trailers trailers) { Preconditions.checkState(!done, "Call already closed"); if (status.isOk()) { - buffer.add(this); + buffer.add(BlockingResponseStream.this); } else { buffer.add(status.asRuntimeException()); } diff --git a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java index c89e57b1d8..63c2b74f0d 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java +++ b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java @@ -32,7 +32,7 @@ public class ServerCalls { ReqT request; @Override public ListenableFuture onPayload(ReqT request) { - if (request == null) { + if (this.request == null) { // We delay calling method.invoke() until onHalfClose(), because application may call // close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose(). this.request = request;