Fixes a few issues in netty server transport:

- Creates and passes a transport instance to ServerListener.transportCreated().
- Keeps the "/" prefix of the fully qualified method name when passing it to the handler
registry.
- Adds necessary "this." when accessing a member variable in ServerCalls.
- BlockingResponseStream.buffer should be added with BlockingResponseStream.this as
as the mark of end of data.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76922440
This commit is contained in:
zhangkun 2014-10-02 18:11:29 -07:00 committed by Eric Anderson
parent 53acb1c8f9
commit bb9699e429
6 changed files with 78 additions and 43 deletions

View File

@ -234,7 +234,9 @@ public class ServerImpl extends AbstractService implements Server {
try {
HandlerRegistry.Method method = registry.lookupMethod(methodName);
if (method == null) {
stream.close(Status.UNIMPLEMENTED, new Metadata.Trailers());
stream.close(
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
new Metadata.Trailers());
return;
}
listener = startCall(stream, methodName, method.getMethodDefinition(), headers);

View File

@ -54,7 +54,7 @@ public final class TransportFrameUtil {
if (!path.startsWith("/")) {
return null;
}
return path.substring(1);
return path;
}
private TransportFrameUtil() {}

View File

@ -8,18 +8,6 @@ import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.newtransport.ServerListener;
import com.google.net.stubby.newtransport.ServerTransportListener;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@ -30,8 +18,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
/**
* Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
@ -57,9 +43,9 @@ public class NettyServer extends AbstractService {
this.channelInitializer = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// TODO(user): pass a real transport object
ServerTransportListener transportListener = serverListener.transportCreated(null);
ch.pipeline().addLast(newHandler(transportListener));
NettyServerTransport transport = new NettyServerTransport();
transport.startAsync();
transport.bind(ch, serverListener);
}
};
this.bossGroup = bossGroup;
@ -113,26 +99,4 @@ public class NettyServer extends AbstractService {
workerGroup.shutdownGracefully();
}
}
private static NettyServerHandler newHandler(ServerTransportListener transportListener) {
Http2Connection connection =
new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy());
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
return new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
}
}

View File

@ -0,0 +1,69 @@
package com.google.net.stubby.newtransport.netty;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.newtransport.ServerListener;
import com.google.net.stubby.newtransport.ServerTransportListener;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.util.internal.logging.InternalLogLevel;
/**
* The Netty-based server transport.
*/
class NettyServerTransport extends AbstractService {
NettyServerHandler handler;
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
// TODO(user): signal GO_AWAY and optionally terminate the socket after a timeout
notifyStopped();
}
/**
* This must be called when the transport is starting or running.
*/
void bind(SocketChannel ch, ServerListener serverListener) {
Preconditions.checkState(handler == null, "Handler already registered");
ServerTransportListener transportListener = serverListener.transportCreated(this);
Http2Connection connection =
new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy());
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
handler = new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
ch.pipeline().addLast(handler);
}
}

View File

@ -357,7 +357,7 @@ public class Calls {
public void onClose(Status status, Metadata.Trailers trailers) {
Preconditions.checkState(!done, "Call already closed");
if (status.isOk()) {
buffer.add(this);
buffer.add(BlockingResponseStream.this);
} else {
buffer.add(status.asRuntimeException());
}

View File

@ -32,7 +32,7 @@ public class ServerCalls {
ReqT request;
@Override
public ListenableFuture<Void> onPayload(ReqT request) {
if (request == null) {
if (this.request == null) {
// We delay calling method.invoke() until onHalfClose(), because application may call
// close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose().
this.request = request;