Various updates to properly handle gRPC connection startup and shutdown.

Also updating to latest version of Netty that contains related fixes.  AbstractHttp2ConnectionHandler was renamed.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76977422
This commit is contained in:
nathanmittler 2014-10-03 13:06:05 -07:00 committed by Eric Anderson
parent bb9699e429
commit 06ed8ec55c
8 changed files with 385 additions and 251 deletions

View File

@ -1,5 +1,8 @@
package com.google.net.stubby;
import static com.google.common.util.concurrent.Service.State.RUNNING;
import static com.google.common.util.concurrent.Service.State.STARTING;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Futures;
@ -47,7 +50,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
@Override
protected void doStart() {
notifyStarted();
obtainActiveTransport(true);
}
@Override
@ -68,9 +71,10 @@ public final class ChannelImpl extends AbstractService implements Channel {
return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor));
}
private synchronized ClientTransport obtainActiveTransport() {
private synchronized ClientTransport obtainActiveTransport(boolean notifyWhenRunning) {
if (activeTransport == null) {
if (state() != State.RUNNING) {
State state = state();
if (state != RUNNING && state != STARTING) {
throw new IllegalStateException("Not running");
}
ClientTransport newTransport = transportFactory.newClientTransport();
@ -80,19 +84,31 @@ public final class ChannelImpl extends AbstractService implements Channel {
// lock, due to reentrancy.
newTransport.addListener(
new TransportListener(newTransport), MoreExecutors.directExecutor());
if (notifyWhenRunning) {
newTransport.addListener(new Listener() {
@Override
public void running() {
notifyStarted();
}
}, executor);
}
newTransport.startAsync();
return newTransport;
}
return activeTransport;
}
private synchronized void transportFailedOrStopped(ClientTransport transport) {
private synchronized void transportFailedOrStopped(ClientTransport transport, Throwable t) {
if (activeTransport == transport) {
activeTransport = null;
}
transports.remove(transport);
if (state() != State.RUNNING && transports.isEmpty()) {
notifyStopped();
if (state() != RUNNING && transports.isEmpty()) {
if (t != null) {
notifyFailed(t);
} else {
notifyStopped();
}
}
}
@ -114,12 +130,12 @@ public final class ChannelImpl extends AbstractService implements Channel {
@Override
public void failed(State from, Throwable failure) {
transportFailedOrStopped(transport);
transportFailedOrStopped(transport, failure);
}
@Override
public void terminated(State from) {
transportFailedOrStopped(transport);
transportFailedOrStopped(transport, null);
}
}
@ -139,7 +155,7 @@ public final class ChannelImpl extends AbstractService implements Channel {
@Override
public void start(Listener<RespT> observer, Metadata.Headers headers) {
Preconditions.checkState(stream == null, "Already started");
stream = obtainActiveTransport().newStream(method, headers,
stream = obtainActiveTransport(false).newStream(method, headers,
new ClientStreamListenerImpl(observer));
}

View File

@ -15,13 +15,14 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import java.util.Map;
@ -29,7 +30,7 @@ import java.util.Map;
* Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
* request-response dialog
*/
public class Http2Codec extends AbstractHttp2ConnectionHandler {
public class Http2Codec extends Http2ConnectionHandler {
public static final int PADDING = 0;
private final RequestRegistry requestRegistry;
private final Session session;
@ -53,9 +54,10 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
* Constructor used by servers, takes a session which will receive operation events.
*/
private Http2Codec(Http2Connection connection, Session session, RequestRegistry requestRegistry) {
super(connection);
super(connection, new LazyFrameListener());
this.session = session;
this.requestRegistry = requestRegistry;
initListener();
}
@Override
@ -67,8 +69,11 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
return http2Writer;
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
private void initListener() {
((LazyFrameListener)((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setCodec(this);
}
private void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
boolean endOfStream) throws Http2Exception {
Request request = requestRegistry.lookup(streamId);
if (request == null) {
@ -92,14 +97,9 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
private void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
Request operation = requestRegistry.lookup(streamId);
if (operation == null) {
@ -119,15 +119,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) throws Http2Exception {
// TODO
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
private void onRstStreamRead(int streamId) {
Request request = requestRegistry.lookup(streamId);
if (request != null) {
closeWithError(request, Status.CANCELLED.withDescription("Stream reset"));
@ -135,45 +127,6 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
// TOOD
}
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
// TOOD
}
@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// TODO
}
@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
// TODO
}
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
// TODO
}
@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode,
ByteBuf debugData) throws Http2Exception {
// TODO
}
@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
throws Http2Exception {
// TODO
}
private boolean isClient() {
return !connection().isServer();
}
@ -272,12 +225,11 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
}
public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) {
return Http2Codec.this.writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise());
return encoder().writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise());
}
public ChannelFuture writeHeaders(int streamId, Http2Headers headers, boolean endStream) {
return Http2Codec.this.writeHeaders(ctx,
return encoder().writeHeaders(ctx,
streamId,
headers,
PADDING,
@ -291,7 +243,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
short weight,
boolean exclusive,
boolean endStream) {
return Http2Codec.this.writeHeaders(ctx,
return encoder().writeHeaders(ctx,
streamId,
headers,
streamDependency,
@ -303,7 +255,39 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
}
public ChannelFuture writeRstStream(int streamId, long errorCode) {
return Http2Codec.this.writeRstStream(ctx, streamId, errorCode, ctx.newPromise());
return encoder().writeRstStream(ctx, streamId, errorCode, ctx.newPromise());
}
}
private static class LazyFrameListener extends Http2FrameAdapter {
private Http2Codec codec;
void setCodec(Http2Codec codec) {
this.codec = codec;
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
codec.onDataRead(ctx, streamId, data, endOfStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
codec.onHeadersRead(ctx, streamId, headers, endStream);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
codec.onRstStreamRead(streamId);
}
}
}

View File

@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
@ -15,19 +16,17 @@ import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2OrHttpChooser;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -57,10 +56,12 @@ public class Http2Negotiator {
*/
ChannelInitializer<SocketChannel> initializer();
void onConnected(Channel channel);
/**
* Awaits completion of the protocol negotiation handshake.
* Completion future for this negotiation.
*/
void await(Channel channel);
ListenableFuture<Void> completeFuture();
}
/**
@ -70,8 +71,8 @@ public class Http2Negotiator {
Preconditions.checkNotNull(handler, "handler");
Preconditions.checkNotNull(sslEngine, "sslEngine");
final SettableFuture<Void> tlsNegotiatedHttp2 = SettableFuture.create();
if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) {
final SettableFuture<Void> completeFuture = SettableFuture.create();
if (!installJettyTLSProtocolSelection(sslEngine, completeFuture)) {
throw new IllegalStateException("NPN/ALPN extensions not installed");
}
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@ -82,14 +83,11 @@ public class Http2Negotiator {
new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) throws Exception {
if (!future.isSuccess()) {
// Throw the exception.
if (tlsNegotiatedHttp2.isDone()) {
tlsNegotiatedHttp2.get();
} else {
future.get();
}
}
// If an error occurred during the handshake, throw it
// to the pipeline.
java.util.concurrent.Future<?> doneFuture =
future.isSuccess() ? completeFuture : future;
doneFuture.get();
}
});
ch.pipeline().addLast(sslHandler);
@ -104,15 +102,13 @@ public class Http2Negotiator {
}
@Override
public void await(Channel channel) {
try {
// Wait for NPN/ALPN negotation to complete. Will throw if failed.
tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
// Attempt to close the channel before propagating the error
channel.close();
throw new IllegalStateException("Error waiting for TLS negotiation", e);
}
public void onConnected(Channel channel) {
// Nothing to do.
}
@Override
public ListenableFuture<Void> completeFuture() {
return completeFuture;
}
};
}
@ -120,14 +116,13 @@ public class Http2Negotiator {
/**
* Create a plaintext upgrade negotiation for HTTP/1.1 to HTTP/2.
*/
public static Negotiation plaintextUpgrade(final AbstractHttp2ConnectionHandler handler) {
public static Negotiation plaintextUpgrade(final Http2ConnectionHandler handler) {
// Register the plaintext upgrader
Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler();
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
@ -143,21 +138,17 @@ public class Http2Negotiator {
}
@Override
public void await(Channel channel) {
try {
// Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
// which causes the upgrade headers to be added
Promise<Void> upgradePromise = completionHandler.getUpgradePromise();
DefaultHttpRequest upgradeTrigger =
new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
channel.writeAndFlush(upgradeTrigger);
// Wait for the upgrade to complete
upgradePromise.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
// Attempt to close the channel before propagating the error
channel.close();
throw new IllegalStateException("Error waiting for plaintext protocol upgrade", e);
}
public ListenableFuture<Void> completeFuture() {
return completionHandler.getUpgradeFuture();
}
@Override
public void onConnected(Channel channel) {
// Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
// which causes the upgrade headers to be added
DefaultHttpRequest upgradeTrigger =
new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
channel.writeAndFlush(upgradeTrigger);
}
};
}
@ -173,13 +164,21 @@ public class Http2Negotiator {
}
};
return new Negotiation() {
private final SettableFuture<Void> completeFuture = SettableFuture.create();
@Override
public ChannelInitializer<SocketChannel> initializer() {
return initializer;
}
@Override
public void await(Channel channel) {}
public void onConnected(Channel channel) {
completeFuture.set(null);
}
@Override
public ListenableFuture<Void> completeFuture() {
return completeFuture;
}
};
}
@ -187,25 +186,19 @@ public class Http2Negotiator {
* Report protocol upgrade completion using a promise.
*/
private static class UpgradeCompletionHandler extends ChannelHandlerAdapter {
private final SettableFuture<Void> upgradeFuture = SettableFuture.create();
private Promise<Void> upgradePromise;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
upgradePromise = ctx.newPromise();
}
public Promise<Void> getUpgradePromise() {
return upgradePromise;
public ListenableFuture<Void> getUpgradeFuture() {
return upgradeFuture;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!upgradePromise.isDone()) {
if (!upgradeFuture.isDone()) {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
upgradePromise.setFailure(new Throwable());
upgradeFuture.setException(new RuntimeException("HTTP/2 upgrade rejected"));
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
upgradePromise.setSuccess(null);
upgradeFuture.set(null);
ctx.pipeline().remove(this);
}
}
@ -214,24 +207,25 @@ public class Http2Negotiator {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (!upgradePromise.isDone()) {
upgradePromise.setFailure(new Throwable());
if (!upgradeFuture.isDone()) {
upgradeFuture.setException(new RuntimeException("Channel closed before upgrade complete"));
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
if (!upgradePromise.isDone()) {
upgradePromise.setFailure(new Throwable());
if (!upgradeFuture.isDone()) {
upgradeFuture.setException(
new RuntimeException("Handler unregistered before upgrade complete"));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
if (!upgradePromise.isDone()) {
upgradePromise.setFailure(cause);
if (!upgradeFuture.isDone()) {
upgradeFuture.setException(cause);
}
}
}

View File

@ -5,18 +5,22 @@ import static com.google.net.stubby.newtransport.netty.NettyClientStream.PENDING
import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
@ -28,12 +32,13 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import javax.annotation.Nullable;
/**
* Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
* the context of the Netty Channel thread.
*/
class NettyClientHandler extends AbstractHttp2ConnectionHandler {
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
class NettyClientHandler extends Http2ConnectionHandler {
/**
* A pending stream creation.
@ -52,14 +57,16 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
private final DefaultHttp2InboundFlowController inboundFlow;
private final Deque<PendingStream> pendingStreams = new ArrayDeque<PendingStream>();
private Status goAwayStatus = GOAWAY_STATUS;
private Throwable connectionError;
private ChannelHandlerContext ctx;
public NettyClientHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
DefaultHttp2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
initListener();
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
// Disallow stream creation by the server.
@ -91,6 +98,17 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
return inboundFlow;
}
@Nullable
public Throwable connectionError() {
return connectionError;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
super.handlerAdded(ctx);
}
/**
* Handler for commands sent from the stream.
*/
@ -111,15 +129,13 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
private void initListener() {
((LazyFrameListener) ((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setHandler(
this);
}
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream)
throws Http2Exception {
NettyClientStream stream = clientStream(connection().requireStream(streamId));
stream.inboundHeadersRecieved(headers, endStream);
}
@ -127,18 +143,23 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
/**
* Handler for an inbound HTTP/2 DATA frame.
*/
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
private void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
boolean endOfStream) throws Http2Exception {
NettyClientStream stream = clientStream(connection().requireStream(streamId));
Http2Stream http2Stream = connection().requireStream(streamId);
NettyClientStream stream = clientStream(http2Stream);
stream.inboundDataReceived(data, endOfStream);
if (stream.state() == StreamState.CLOSED && !endOfStream) {
// TODO(user): This is a hack due to the test server not consistently
// setting endOfStream on the last frame for the v1 protocol.
// Remove this once b/17692766 is fixed.
lifecycleManager().closeRemoteSide(http2Stream, ctx.newSucceededFuture());
}
}
/**
* Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
*/
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
private void onRstStreamRead(int streamId)
throws Http2Exception {
// TODO(user): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId);
@ -154,6 +175,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
super.channelInactive(ctx);
// Fail any streams that are awaiting creation.
Status goAwayStatus = goAwayStatus();
failPendingStreams(goAwayStatus);
// Any streams that are still active must be closed.
@ -162,29 +184,22 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
}
}
/**
* Handler for connection errors that have occurred during HTTP/2 frame processing.
*/
@Override
protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) {
// Save the exception that is causing us to send a GO_AWAY.
goAwayStatus = Status.fromThrowable(cause);
// Call the base class to send the GOAWAY. This will call the goingAway handler.
super.onConnectionError(ctx, cause);
}
/**
* Handler for stream errors that have occurred during HTTP/2 frame processing.
*/
@Override
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(cause.streamId());
if (stream != null) {
clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers());
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Force the conversion of any exceptions into HTTP/2 exceptions.
Http2Exception e = Http2CodecUtil.toHttp2Exception(cause);
if (e instanceof Http2StreamException) {
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(((Http2StreamException) e).streamId());
if (stream != null) {
clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers());
}
} else {
connectionError = e;
}
super.onStreamError(ctx, cause);
// Delegate to the super class for proper handling of the Http2Exception.
super.exceptionCaught(ctx, e);
}
/**
@ -221,7 +236,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
Http2Stream http2Stream = connection().requireStream(stream.id());
if (http2Stream.state() != Http2Stream.State.CLOSED) {
// Note: RST_STREAM frames are automatically flushed.
writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
}
}
@ -246,7 +261,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
// Call the base class to write the HTTP/2 DATA frame.
// Note: no need to flush since this is handled by the outbound flow controller.
writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
}
/**
@ -254,6 +269,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
*/
private void goingAway() {
// Fail any streams that are awaiting creation.
Status goAwayStatus = goAwayStatus();
failPendingStreams(goAwayStatus);
if (connection().local().isGoAwayReceived()) {
@ -285,6 +301,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
private void createPendingStreams() {
Http2Connection connection = connection();
Http2Connection.Endpoint local = connection.local();
Status goAwayStatus = goAwayStatus();
while (!pendingStreams.isEmpty()) {
final int streamId = local.nextStreamId();
if (streamId <= 0) {
@ -308,7 +325,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
// Finish creation of the stream by writing a headers frame.
final PendingStream pendingStream = pendingStreams.remove();
writeHeaders(ctx(), streamId, pendingStream.headers, 0, false, ctx().newPromise())
encoder().writeHeaders(ctx, streamId, pendingStream.headers, 0, false, ctx.newPromise())
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -320,10 +337,20 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
}
}
});
ctx().flush();
ctx.flush();
}
}
/**
* Returns the appropriate status used to represent the cause for GOAWAY.
*/
private Status goAwayStatus() {
if (connectionError != null) {
return Status.fromThrowable(connectionError);
}
return Status.UNAVAILABLE;
}
/**
* Handles the successful creation of a new stream.
*/
@ -390,11 +417,43 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
clientStream(stream).setStatus(
Status.INTERNAL.withDescription("Stream in invalid state: " + stream.state()),
new Metadata.Trailers());
writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise());
ctx().flush();
encoder().writeRstStream(ctx, stream.id(), Http2Error.INTERNAL_ERROR.code(),
ctx.newPromise());
break;
default:
break;
}
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyClientHandler handler;
void setHandler(NettyClientHandler handler) {
this.handler = handler;
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
handler.onDataRead(ctx, streamId, data, endOfStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
handler.onHeadersRead(streamId, headers, endStream);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
handler.onRstStreamRead(streamId);
}
}
}

View File

@ -3,6 +3,9 @@ package com.google.net.stubby.newtransport.netty;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.AbstractClientTransport;
@ -125,24 +128,50 @@ class NettyClientTransport extends AbstractClientTransport {
b.connect(address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
channel = future.channel();
notifyStarted();
// Listen for the channel close event.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
notifyStopped();
} else {
notifyFailed(future.cause());
}
}
});
} else {
if (!future.isSuccess()) {
// The connection attempt failed.
notifyFailed(future.cause());
return;
}
// Connected successfully, start the protocol negotiation.
channel = future.channel();
negotiation.onConnected(channel);
final ListenableFuture<Void> negotiationFuture = negotiation.completeFuture();
Futures.addCallback(negotiationFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
// The negotiation was successful.
notifyStarted();
// Handle transport shutdown when the channel is closed.
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The close failed. Just notify that transport shutdown failed.
notifyFailed(future.cause());
return;
}
if (handler.connectionError() != null) {
// The handler encountered a connection error.
notifyFailed(handler.connectionError());
} else {
// Normal termination of the connection.
notifyStopped();
}
}
});
}
@Override
public void onFailure(Throwable t) {
// The negotiation failed.
notifyFailed(t);
}
});
}
});
}
@ -154,10 +183,6 @@ class NettyClientTransport extends AbstractClientTransport {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (eventGroup != null) {
eventGroup.shutdownGracefully();
}
}
private static NettyClientHandler newHandler() {

View File

@ -16,12 +16,15 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
@ -30,7 +33,6 @@ import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -38,7 +40,7 @@ import java.util.logging.Logger;
* Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
* the context of the Netty Channel thread.
*/
class NettyServerHandler extends AbstractHttp2ConnectionHandler {
class NettyServerHandler extends Http2ConnectionHandler {
private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
@ -52,30 +54,29 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
Http2FrameWriter frameWriter,
DefaultHttp2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
initListener();
connection.local().allowPushTo(false);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
private void initListener() {
((LazyFrameListener) ((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setHandler(
this);
}
private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
throws Http2Exception {
try {
NettyServerStream stream = new NettyServerStream(ctx.channel(), streamId, inboundFlow);
// The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this method.
// The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
// method.
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.data(stream);
String method = determineMethod(streamId, headers);
ServerStreamListener listener = transportListener.streamCreated(stream, method,
Utils.convertHeaders(headers));
ServerStreamListener listener =
transportListener.streamCreated(stream, method, Utils.convertHeaders(headers));
stream.setListener(listener);
} catch (Http2Exception e) {
throw e;
@ -85,12 +86,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onDataRead(ChannelHandlerContext ctx,
int streamId,
ByteBuf data,
int padding,
boolean endOfStream) throws Http2Exception {
private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception {
try {
NettyServerStream stream = serverStream(connection().requireStream(streamId));
stream.inboundDataReceived(data, endOfStream);
@ -102,9 +98,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
}
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
private void onRstStreamRead(int streamId) throws Http2Exception {
try {
NettyServerStream stream = serverStream(connection().requireStream(streamId));
stream.abortStream(Status.CANCELLED, false);
@ -116,25 +110,23 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
}
}
/**
* Handler for stream errors that have occurred during HTTP/2 frame processing.
*
* <p>When a callback method of this class throws an Http2StreamException,
* it will be handled by this method. Other types of exceptions will be handled by
* {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} from the base class. The
* catch-all logic is in {@link #decode(ChannelHandlerContext, ByteBuf, List)} from the base class.
*/
@Override
protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) {
// Aborts the stream with a status that contains the cause.
Http2Stream stream = connection().stream(cause.streamId());
if (stream != null) {
// Send the error message to the client to help debugging.
serverStream(stream).abortStream(Status.fromThrowable(cause), true);
} else {
// Only call the base class if we cannot anything about it.
super.onStreamError(ctx, cause);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Force the conversion of any exceptions into HTTP/2 exceptions.
Http2Exception e = Http2CodecUtil.toHttp2Exception(cause);
if (e instanceof Http2StreamException) {
// Aborts the stream with a status that contains the cause.
Http2Stream stream = connection().stream(((Http2StreamException)cause).streamId());
if (stream != null) {
// Send the error message to the client to help debugging.
serverStream(stream).abortStream(Status.fromThrowable(cause), true);
// We've already handled it, don't call the base class.
return;
}
}
// Delegate to the super class for proper handling of the Http2Exception.
super.exceptionCaught(ctx, e);
}
/**
@ -167,11 +159,11 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
});
}
// Call the base class to write the HTTP/2 DATA frame.
writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
ctx.flush();
} else if (msg instanceof SendResponseHeadersCommand) {
SendResponseHeadersCommand cmd = (SendResponseHeadersCommand) msg;
writeHeaders(ctx,
encoder().writeHeaders(ctx,
cmd.streamId(),
new DefaultHttp2Headers()
.status(STATUS_OK)
@ -214,4 +206,36 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
private NettyServerStream serverStream(Http2Stream stream) {
return stream.<NettyServerStream>data();
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyServerHandler handler;
void setHandler(NettyServerHandler handler) {
this.handler = handler;
}
@Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
handler.onDataRead(streamId, data, endOfStream);
}
@Override
public void onHeadersRead(ChannelHandlerContext ctx,
int streamId,
Http2Headers headers,
int streamDependency,
short weight,
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
handler.onHeadersRead(ctx, streamId, headers);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
handler.onRstStreamRead(streamId);
}
}
}

View File

@ -1,5 +1,6 @@
package com.google.net.stubby.newtransport.okhttp;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.SerializingExecutor;
import com.google.net.stubby.Status;
@ -12,6 +13,7 @@ import okio.Buffer;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
class AsyncFrameWriter implements FrameWriter {
@ -177,12 +179,28 @@ class AsyncFrameWriter implements FrameWriter {
@Override
public void close() {
executor.execute(new WriteRunnable() {
// Wait for the frameWriter to close.
final SettableFuture<?> closeFuture = SettableFuture.create();
executor.execute(new Runnable() {
@Override
public void doRun() throws IOException {
frameWriter.close();
public void run() {
try {
frameWriter.close();
} catch (IOException e) {
closeFuture.setException(e);
} finally {
closeFuture.set(null);
}
}
});
try {
closeFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
private abstract class WriteRunnable implements Runnable {

View File

@ -100,6 +100,8 @@ public class OkHttpClientTransport extends AbstractClientTransport {
private boolean goAway;
@GuardedBy("lock")
private Status goAwayStatus;
@GuardedBy("lock")
private boolean stopped;
OkHttpClientTransport(InetSocketAddress address, Executor executor) {
this.address = Preconditions.checkNotNull(address);
@ -149,9 +151,9 @@ public class OkHttpClientTransport extends AbstractClientTransport {
frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
}
notifyStarted();
clientFrameHandler = new ClientFrameHandler();
executor.execute(clientFrameHandler);
notifyStarted();
}
@Override
@ -161,10 +163,11 @@ public class OkHttpClientTransport extends AbstractClientTransport {
normalClose = !goAway;
}
if (normalClose) {
abort(Status.INTERNAL.withDescription("Transport stopped"));
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
// The GOAWAY is part of graceful shutdown.
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
abort(Status.INTERNAL.withDescription("Transport stopped"));
}
stopIfNecessary();
}
@ -203,7 +206,10 @@ public class OkHttpClientTransport extends AbstractClientTransport {
// Starting stop, go into STOPPING state so that Channel know this Transport should not be used
// further, will become STOPPED once all streams are complete.
stopAsync();
State state = state();
if (state == State.RUNNING || state == State.NEW) {
stopAsync();
}
for (OkHttpClientStream stream : goAwayStreams) {
stream.setStatus(status, new Metadata.Trailers());
@ -233,8 +239,16 @@ public class OkHttpClientTransport extends AbstractClientTransport {
boolean shouldStop;
synchronized (lock) {
shouldStop = (goAway && streams.size() == 0);
if (shouldStop) {
if (stopped) {
// We've already stopped, don't stop again.
shouldStop = false;
}
stopped = true;
}
}
if (shouldStop) {
// Wait for the frame writer to close.
frameWriter.close();
try {
frameReader.close();