From 06ed8ec55c34c882afb27ac398d9386843374ef9 Mon Sep 17 00:00:00 2001 From: nathanmittler Date: Fri, 3 Oct 2014 13:06:05 -0700 Subject: [PATCH] Various updates to properly handle gRPC connection startup and shutdown. Also updating to latest version of Netty that contains related fixes. AbstractHttp2ConnectionHandler was renamed. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=76977422 --- .../com/google/net/stubby/ChannelImpl.java | 34 ++-- .../net/stubby/http2/netty/Http2Codec.java | 114 ++++++------- .../newtransport/netty/Http2Negotiator.java | 114 +++++++------ .../netty/NettyClientHandler.java | 151 ++++++++++++------ .../netty/NettyClientTransport.java | 65 +++++--- .../netty/NettyServerHandler.java | 114 +++++++------ .../newtransport/okhttp/AsyncFrameWriter.java | 24 ++- .../okhttp/OkHttpClientTransport.java | 20 ++- 8 files changed, 385 insertions(+), 251 deletions(-) diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index 121ffb186d..b73563d8cb 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -1,5 +1,8 @@ package com.google.net.stubby; +import static com.google.common.util.concurrent.Service.State.RUNNING; +import static com.google.common.util.concurrent.Service.State.STARTING; + import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.Futures; @@ -47,7 +50,7 @@ public final class ChannelImpl extends AbstractService implements Channel { @Override protected void doStart() { - notifyStarted(); + obtainActiveTransport(true); } @Override @@ -68,9 +71,10 @@ public final class ChannelImpl extends AbstractService implements Channel { return new CallImpl(method, new SerializingExecutor(executor)); } - private synchronized ClientTransport obtainActiveTransport() { + private synchronized ClientTransport obtainActiveTransport(boolean notifyWhenRunning) { if (activeTransport == null) { - if (state() != State.RUNNING) { + State state = state(); + if (state != RUNNING && state != STARTING) { throw new IllegalStateException("Not running"); } ClientTransport newTransport = transportFactory.newClientTransport(); @@ -80,19 +84,31 @@ public final class ChannelImpl extends AbstractService implements Channel { // lock, due to reentrancy. newTransport.addListener( new TransportListener(newTransport), MoreExecutors.directExecutor()); + if (notifyWhenRunning) { + newTransport.addListener(new Listener() { + @Override + public void running() { + notifyStarted(); + } + }, executor); + } newTransport.startAsync(); return newTransport; } return activeTransport; } - private synchronized void transportFailedOrStopped(ClientTransport transport) { + private synchronized void transportFailedOrStopped(ClientTransport transport, Throwable t) { if (activeTransport == transport) { activeTransport = null; } transports.remove(transport); - if (state() != State.RUNNING && transports.isEmpty()) { - notifyStopped(); + if (state() != RUNNING && transports.isEmpty()) { + if (t != null) { + notifyFailed(t); + } else { + notifyStopped(); + } } } @@ -114,12 +130,12 @@ public final class ChannelImpl extends AbstractService implements Channel { @Override public void failed(State from, Throwable failure) { - transportFailedOrStopped(transport); + transportFailedOrStopped(transport, failure); } @Override public void terminated(State from) { - transportFailedOrStopped(transport); + transportFailedOrStopped(transport, null); } } @@ -139,7 +155,7 @@ public final class ChannelImpl extends AbstractService implements Channel { @Override public void start(Listener observer, Metadata.Headers headers) { Preconditions.checkState(stream == null, "Already started"); - stream = obtainActiveTransport().newStream(method, headers, + stream = obtainActiveTransport(false).newStream(method, headers, new ClientStreamListenerImpl(observer)); } diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java index f03d65bdf4..88cee4392c 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java @@ -15,13 +15,14 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.AsciiString; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.DefaultHttp2Connection; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2Settings; import java.util.Map; @@ -29,7 +30,7 @@ import java.util.Map; * Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing * request-response dialog */ -public class Http2Codec extends AbstractHttp2ConnectionHandler { +public class Http2Codec extends Http2ConnectionHandler { public static final int PADDING = 0; private final RequestRegistry requestRegistry; private final Session session; @@ -53,9 +54,10 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { * Constructor used by servers, takes a session which will receive operation events. */ private Http2Codec(Http2Connection connection, Session session, RequestRegistry requestRegistry) { - super(connection); + super(connection, new LazyFrameListener()); this.session = session; this.requestRegistry = requestRegistry; + initListener(); } @Override @@ -67,8 +69,11 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { return http2Writer; } - @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + private void initListener() { + ((LazyFrameListener)((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setCodec(this); + } + + private void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { Request request = requestRegistry.lookup(streamId); if (request == null) { @@ -92,14 +97,9 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } } - @Override - public void onHeadersRead(ChannelHandlerContext ctx, + private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, - int streamDependency, - short weight, - boolean exclusive, - int padding, boolean endStream) throws Http2Exception { Request operation = requestRegistry.lookup(streamId); if (operation == null) { @@ -119,15 +119,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } } - @Override - public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, - short weight, boolean exclusive) throws Http2Exception { - // TODO - } - - @Override - public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) - throws Http2Exception { + private void onRstStreamRead(int streamId) { Request request = requestRegistry.lookup(streamId); if (request != null) { closeWithError(request, Status.CANCELLED.withDescription("Stream reset")); @@ -135,45 +127,6 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } } - @Override - public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception { - // TOOD - } - - @Override - public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) - throws Http2Exception { - // TOOD - } - - @Override - public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - // TODO - } - - @Override - public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception { - // TODO - } - - @Override - public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, - Http2Headers headers, int padding) throws Http2Exception { - // TODO - } - - @Override - public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, - ByteBuf debugData) throws Http2Exception { - // TODO - } - - @Override - public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) - throws Http2Exception { - // TODO - } - private boolean isClient() { return !connection().isServer(); } @@ -272,12 +225,11 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) { - return Http2Codec.this.writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise()); + return encoder().writeData(ctx, streamId, data, PADDING, endStream, ctx.newPromise()); } public ChannelFuture writeHeaders(int streamId, Http2Headers headers, boolean endStream) { - - return Http2Codec.this.writeHeaders(ctx, + return encoder().writeHeaders(ctx, streamId, headers, PADDING, @@ -291,7 +243,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { short weight, boolean exclusive, boolean endStream) { - return Http2Codec.this.writeHeaders(ctx, + return encoder().writeHeaders(ctx, streamId, headers, streamDependency, @@ -303,7 +255,39 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } public ChannelFuture writeRstStream(int streamId, long errorCode) { - return Http2Codec.this.writeRstStream(ctx, streamId, errorCode, ctx.newPromise()); + return encoder().writeRstStream(ctx, streamId, errorCode, ctx.newPromise()); + } + } + + private static class LazyFrameListener extends Http2FrameAdapter { + private Http2Codec codec; + + void setCodec(Http2Codec codec) { + this.codec = codec; + } + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + codec.onDataRead(ctx, streamId, data, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, + int streamId, + Http2Headers headers, + int streamDependency, + short weight, + boolean exclusive, + int padding, + boolean endStream) throws Http2Exception { + codec.onHeadersRead(ctx, streamId, headers, endStream); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + codec.onRstStreamRead(streamId); } } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java index c90610104f..16adae6739 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java @@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.netty.channel.Channel; @@ -15,19 +16,17 @@ import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientUpgradeHandler; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; +import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2OrHttpChooser; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,10 +56,12 @@ public class Http2Negotiator { */ ChannelInitializer initializer(); + void onConnected(Channel channel); + /** - * Awaits completion of the protocol negotiation handshake. + * Completion future for this negotiation. */ - void await(Channel channel); + ListenableFuture completeFuture(); } /** @@ -70,8 +71,8 @@ public class Http2Negotiator { Preconditions.checkNotNull(handler, "handler"); Preconditions.checkNotNull(sslEngine, "sslEngine"); - final SettableFuture tlsNegotiatedHttp2 = SettableFuture.create(); - if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) { + final SettableFuture completeFuture = SettableFuture.create(); + if (!installJettyTLSProtocolSelection(sslEngine, completeFuture)) { throw new IllegalStateException("NPN/ALPN extensions not installed"); } final ChannelInitializer initializer = new ChannelInitializer() { @@ -82,14 +83,11 @@ public class Http2Negotiator { new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { - if (!future.isSuccess()) { - // Throw the exception. - if (tlsNegotiatedHttp2.isDone()) { - tlsNegotiatedHttp2.get(); - } else { - future.get(); - } - } + // If an error occurred during the handshake, throw it + // to the pipeline. + java.util.concurrent.Future doneFuture = + future.isSuccess() ? completeFuture : future; + doneFuture.get(); } }); ch.pipeline().addLast(sslHandler); @@ -104,15 +102,13 @@ public class Http2Negotiator { } @Override - public void await(Channel channel) { - try { - // Wait for NPN/ALPN negotation to complete. Will throw if failed. - tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS); - } catch (Exception e) { - // Attempt to close the channel before propagating the error - channel.close(); - throw new IllegalStateException("Error waiting for TLS negotiation", e); - } + public void onConnected(Channel channel) { + // Nothing to do. + } + + @Override + public ListenableFuture completeFuture() { + return completeFuture; } }; } @@ -120,14 +116,13 @@ public class Http2Negotiator { /** * Create a plaintext upgrade negotiation for HTTP/1.1 to HTTP/2. */ - public static Negotiation plaintextUpgrade(final AbstractHttp2ConnectionHandler handler) { + public static Negotiation plaintextUpgrade(final Http2ConnectionHandler handler) { // Register the plaintext upgrader Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler); HttpClientCodec httpClientCodec = new HttpClientCodec(); final HttpClientUpgradeHandler upgrader = new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler(); - final ChannelInitializer initializer = new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { @@ -143,21 +138,17 @@ public class Http2Negotiator { } @Override - public void await(Channel channel) { - try { - // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request - // which causes the upgrade headers to be added - Promise upgradePromise = completionHandler.getUpgradePromise(); - DefaultHttpRequest upgradeTrigger = - new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - channel.writeAndFlush(upgradeTrigger); - // Wait for the upgrade to complete - upgradePromise.get(5, TimeUnit.SECONDS); - } catch (Exception e) { - // Attempt to close the channel before propagating the error - channel.close(); - throw new IllegalStateException("Error waiting for plaintext protocol upgrade", e); - } + public ListenableFuture completeFuture() { + return completionHandler.getUpgradeFuture(); + } + + @Override + public void onConnected(Channel channel) { + // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request + // which causes the upgrade headers to be added + DefaultHttpRequest upgradeTrigger = + new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + channel.writeAndFlush(upgradeTrigger); } }; } @@ -173,13 +164,21 @@ public class Http2Negotiator { } }; return new Negotiation() { + private final SettableFuture completeFuture = SettableFuture.create(); @Override public ChannelInitializer initializer() { return initializer; } @Override - public void await(Channel channel) {} + public void onConnected(Channel channel) { + completeFuture.set(null); + } + + @Override + public ListenableFuture completeFuture() { + return completeFuture; + } }; } @@ -187,25 +186,19 @@ public class Http2Negotiator { * Report protocol upgrade completion using a promise. */ private static class UpgradeCompletionHandler extends ChannelHandlerAdapter { + private final SettableFuture upgradeFuture = SettableFuture.create(); - private Promise upgradePromise; - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - upgradePromise = ctx.newPromise(); - } - - public Promise getUpgradePromise() { - return upgradePromise; + public ListenableFuture getUpgradeFuture() { + return upgradeFuture; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (!upgradePromise.isDone()) { + if (!upgradeFuture.isDone()) { if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { - upgradePromise.setFailure(new Throwable()); + upgradeFuture.setException(new RuntimeException("HTTP/2 upgrade rejected")); } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) { - upgradePromise.setSuccess(null); + upgradeFuture.set(null); ctx.pipeline().remove(this); } } @@ -214,24 +207,25 @@ public class Http2Negotiator { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - if (!upgradePromise.isDone()) { - upgradePromise.setFailure(new Throwable()); + if (!upgradeFuture.isDone()) { + upgradeFuture.setException(new RuntimeException("Channel closed before upgrade complete")); } } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); - if (!upgradePromise.isDone()) { - upgradePromise.setFailure(new Throwable()); + if (!upgradeFuture.isDone()) { + upgradeFuture.setException( + new RuntimeException("Handler unregistered before upgrade complete")); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); - if (!upgradePromise.isDone()) { - upgradePromise.setFailure(cause); + if (!upgradeFuture.isDone()) { + upgradeFuture.setException(cause); } } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java index 9f07d2a468..19cbecaa1c 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java @@ -5,18 +5,22 @@ import static com.google.net.stubby.newtransport.netty.NettyClientStream.PENDING import com.google.common.base.Preconditions; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.StreamState; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; +import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2ConnectionAdapter; +import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; @@ -28,12 +32,13 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; +import javax.annotation.Nullable; + /** * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within * the context of the Netty Channel thread. */ -class NettyClientHandler extends AbstractHttp2ConnectionHandler { - private static final Status GOAWAY_STATUS = Status.UNAVAILABLE; +class NettyClientHandler extends Http2ConnectionHandler { /** * A pending stream creation. @@ -52,14 +57,16 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { private final DefaultHttp2InboundFlowController inboundFlow; private final Deque pendingStreams = new ArrayDeque(); - private Status goAwayStatus = GOAWAY_STATUS; + private Throwable connectionError; + private ChannelHandlerContext ctx; public NettyClientHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, DefaultHttp2InboundFlowController inboundFlow, Http2OutboundFlowController outboundFlow) { - super(connection, frameReader, frameWriter, inboundFlow, outboundFlow); + super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener()); + initListener(); this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow"); // Disallow stream creation by the server. @@ -91,6 +98,17 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { return inboundFlow; } + @Nullable + public Throwable connectionError() { + return connectionError; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + super.handlerAdded(ctx); + } + /** * Handler for commands sent from the stream. */ @@ -111,15 +129,13 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { } } - @Override - public void onHeadersRead(ChannelHandlerContext ctx, - int streamId, - Http2Headers headers, - int streamDependency, - short weight, - boolean exclusive, - int padding, - boolean endStream) throws Http2Exception { + private void initListener() { + ((LazyFrameListener) ((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setHandler( + this); + } + + private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) + throws Http2Exception { NettyClientStream stream = clientStream(connection().requireStream(streamId)); stream.inboundHeadersRecieved(headers, endStream); } @@ -127,18 +143,23 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { /** * Handler for an inbound HTTP/2 DATA frame. */ - @Override - public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + private void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { - NettyClientStream stream = clientStream(connection().requireStream(streamId)); + Http2Stream http2Stream = connection().requireStream(streamId); + NettyClientStream stream = clientStream(http2Stream); stream.inboundDataReceived(data, endOfStream); + if (stream.state() == StreamState.CLOSED && !endOfStream) { + // TODO(user): This is a hack due to the test server not consistently + // setting endOfStream on the last frame for the v1 protocol. + // Remove this once b/17692766 is fixed. + lifecycleManager().closeRemoteSide(http2Stream, ctx.newSucceededFuture()); + } } /** * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream. */ - @Override - public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + private void onRstStreamRead(int streamId) throws Http2Exception { // TODO(user): do something with errorCode? Http2Stream http2Stream = connection().requireStream(streamId); @@ -154,6 +175,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { super.channelInactive(ctx); // Fail any streams that are awaiting creation. + Status goAwayStatus = goAwayStatus(); failPendingStreams(goAwayStatus); // Any streams that are still active must be closed. @@ -162,29 +184,22 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { } } - /** - * Handler for connection errors that have occurred during HTTP/2 frame processing. - */ @Override - protected void onConnectionError(ChannelHandlerContext ctx, Http2Exception cause) { - // Save the exception that is causing us to send a GO_AWAY. - goAwayStatus = Status.fromThrowable(cause); - - // Call the base class to send the GOAWAY. This will call the goingAway handler. - super.onConnectionError(ctx, cause); - } - - /** - * Handler for stream errors that have occurred during HTTP/2 frame processing. - */ - @Override - protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { - // Close the stream with a status that contains the cause. - Http2Stream stream = connection().stream(cause.streamId()); - if (stream != null) { - clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // Force the conversion of any exceptions into HTTP/2 exceptions. + Http2Exception e = Http2CodecUtil.toHttp2Exception(cause); + if (e instanceof Http2StreamException) { + // Close the stream with a status that contains the cause. + Http2Stream stream = connection().stream(((Http2StreamException) e).streamId()); + if (stream != null) { + clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers()); + } + } else { + connectionError = e; } - super.onStreamError(ctx, cause); + + // Delegate to the super class for proper handling of the Http2Exception. + super.exceptionCaught(ctx, e); } /** @@ -221,7 +236,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { Http2Stream http2Stream = connection().requireStream(stream.id()); if (http2Stream.state() != Http2Stream.State.CLOSED) { // Note: RST_STREAM frames are automatically flushed. - writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); + encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } } @@ -246,7 +261,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { // Call the base class to write the HTTP/2 DATA frame. // Note: no need to flush since this is handled by the outbound flow controller. - writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); + encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); } /** @@ -254,6 +269,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { */ private void goingAway() { // Fail any streams that are awaiting creation. + Status goAwayStatus = goAwayStatus(); failPendingStreams(goAwayStatus); if (connection().local().isGoAwayReceived()) { @@ -285,6 +301,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { private void createPendingStreams() { Http2Connection connection = connection(); Http2Connection.Endpoint local = connection.local(); + Status goAwayStatus = goAwayStatus(); while (!pendingStreams.isEmpty()) { final int streamId = local.nextStreamId(); if (streamId <= 0) { @@ -308,7 +325,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { // Finish creation of the stream by writing a headers frame. final PendingStream pendingStream = pendingStreams.remove(); - writeHeaders(ctx(), streamId, pendingStream.headers, 0, false, ctx().newPromise()) + encoder().writeHeaders(ctx, streamId, pendingStream.headers, 0, false, ctx.newPromise()) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -320,10 +337,20 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { } } }); - ctx().flush(); + ctx.flush(); } } + /** + * Returns the appropriate status used to represent the cause for GOAWAY. + */ + private Status goAwayStatus() { + if (connectionError != null) { + return Status.fromThrowable(connectionError); + } + return Status.UNAVAILABLE; + } + /** * Handles the successful creation of a new stream. */ @@ -390,11 +417,43 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { clientStream(stream).setStatus( Status.INTERNAL.withDescription("Stream in invalid state: " + stream.state()), new Metadata.Trailers()); - writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise()); - ctx().flush(); + encoder().writeRstStream(ctx, stream.id(), Http2Error.INTERNAL_ERROR.code(), + ctx.newPromise()); break; default: break; } } + + private static class LazyFrameListener extends Http2FrameAdapter { + private NettyClientHandler handler; + + void setHandler(NettyClientHandler handler) { + this.handler = handler; + } + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + handler.onDataRead(ctx, streamId, data, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, + int streamId, + Http2Headers headers, + int streamDependency, + short weight, + boolean exclusive, + int padding, + boolean endStream) throws Http2Exception { + handler.onHeadersRead(streamId, headers, endStream); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + handler.onRstStreamRead(streamId); + } + } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java index 9762968c8d..c6f59493d9 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java @@ -3,6 +3,9 @@ package com.google.net.stubby.newtransport.netty; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.MethodDescriptor; import com.google.net.stubby.newtransport.AbstractClientTransport; @@ -125,24 +128,50 @@ class NettyClientTransport extends AbstractClientTransport { b.connect(address).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - channel = future.channel(); - notifyStarted(); - - // Listen for the channel close event. - channel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - notifyStopped(); - } else { - notifyFailed(future.cause()); - } - } - }); - } else { + if (!future.isSuccess()) { + // The connection attempt failed. notifyFailed(future.cause()); + return; } + + // Connected successfully, start the protocol negotiation. + channel = future.channel(); + negotiation.onConnected(channel); + + final ListenableFuture negotiationFuture = negotiation.completeFuture(); + Futures.addCallback(negotiationFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + // The negotiation was successful. + notifyStarted(); + + // Handle transport shutdown when the channel is closed. + channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + // The close failed. Just notify that transport shutdown failed. + notifyFailed(future.cause()); + return; + } + + if (handler.connectionError() != null) { + // The handler encountered a connection error. + notifyFailed(handler.connectionError()); + } else { + // Normal termination of the connection. + notifyStopped(); + } + } + }); + } + + @Override + public void onFailure(Throwable t) { + // The negotiation failed. + notifyFailed(t); + } + }); } }); } @@ -154,10 +183,6 @@ class NettyClientTransport extends AbstractClientTransport { if (channel != null && channel.isOpen()) { channel.close(); } - - if (eventGroup != null) { - eventGroup.shutdownGracefully(); - } } private static NettyClientHandler newHandler() { diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java index d12684d9a3..96bcef29ce 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java @@ -16,12 +16,15 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler; +import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; +import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; @@ -30,7 +33,6 @@ import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamException; import io.netty.util.ReferenceCountUtil; -import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,7 +40,7 @@ import java.util.logging.Logger; * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within * the context of the Netty Channel thread. */ -class NettyServerHandler extends AbstractHttp2ConnectionHandler { +class NettyServerHandler extends Http2ConnectionHandler { private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); @@ -52,30 +54,29 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { Http2FrameWriter frameWriter, DefaultHttp2InboundFlowController inboundFlow, Http2OutboundFlowController outboundFlow) { - super(connection, frameReader, frameWriter, inboundFlow, outboundFlow); + super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener()); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow"); - + initListener(); connection.local().allowPushTo(false); } - @Override - public void onHeadersRead(ChannelHandlerContext ctx, - int streamId, - Http2Headers headers, - int streamDependency, - short weight, - boolean exclusive, - int padding, - boolean endStream) throws Http2Exception { + private void initListener() { + ((LazyFrameListener) ((DefaultHttp2ConnectionDecoder) this.decoder()).listener()).setHandler( + this); + } + + private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) + throws Http2Exception { try { NettyServerStream stream = new NettyServerStream(ctx.channel(), streamId, inboundFlow); - // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this method. + // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this + // method. Http2Stream http2Stream = connection().requireStream(streamId); http2Stream.data(stream); String method = determineMethod(streamId, headers); - ServerStreamListener listener = transportListener.streamCreated(stream, method, - Utils.convertHeaders(headers)); + ServerStreamListener listener = + transportListener.streamCreated(stream, method, Utils.convertHeaders(headers)); stream.setListener(listener); } catch (Http2Exception e) { throw e; @@ -85,12 +86,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { } } - @Override - public void onDataRead(ChannelHandlerContext ctx, - int streamId, - ByteBuf data, - int padding, - boolean endOfStream) throws Http2Exception { + private void onDataRead(int streamId, ByteBuf data, boolean endOfStream) throws Http2Exception { try { NettyServerStream stream = serverStream(connection().requireStream(streamId)); stream.inboundDataReceived(data, endOfStream); @@ -102,9 +98,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { } } - @Override - public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) - throws Http2Exception { + private void onRstStreamRead(int streamId) throws Http2Exception { try { NettyServerStream stream = serverStream(connection().requireStream(streamId)); stream.abortStream(Status.CANCELLED, false); @@ -116,25 +110,23 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { } } - /** - * Handler for stream errors that have occurred during HTTP/2 frame processing. - * - *

When a callback method of this class throws an Http2StreamException, - * it will be handled by this method. Other types of exceptions will be handled by - * {@link #onConnectionError(ChannelHandlerContext, Http2Exception)} from the base class. The - * catch-all logic is in {@link #decode(ChannelHandlerContext, ByteBuf, List)} from the base class. - */ @Override - protected void onStreamError(ChannelHandlerContext ctx, Http2StreamException cause) { - // Aborts the stream with a status that contains the cause. - Http2Stream stream = connection().stream(cause.streamId()); - if (stream != null) { - // Send the error message to the client to help debugging. - serverStream(stream).abortStream(Status.fromThrowable(cause), true); - } else { - // Only call the base class if we cannot anything about it. - super.onStreamError(ctx, cause); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // Force the conversion of any exceptions into HTTP/2 exceptions. + Http2Exception e = Http2CodecUtil.toHttp2Exception(cause); + if (e instanceof Http2StreamException) { + // Aborts the stream with a status that contains the cause. + Http2Stream stream = connection().stream(((Http2StreamException)cause).streamId()); + if (stream != null) { + // Send the error message to the client to help debugging. + serverStream(stream).abortStream(Status.fromThrowable(cause), true); + // We've already handled it, don't call the base class. + return; + } } + + // Delegate to the super class for proper handling of the Http2Exception. + super.exceptionCaught(ctx, e); } /** @@ -167,11 +159,11 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { }); } // Call the base class to write the HTTP/2 DATA frame. - writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); + encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); ctx.flush(); } else if (msg instanceof SendResponseHeadersCommand) { SendResponseHeadersCommand cmd = (SendResponseHeadersCommand) msg; - writeHeaders(ctx, + encoder().writeHeaders(ctx, cmd.streamId(), new DefaultHttp2Headers() .status(STATUS_OK) @@ -214,4 +206,36 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { private NettyServerStream serverStream(Http2Stream stream) { return stream.data(); } + + private static class LazyFrameListener extends Http2FrameAdapter { + private NettyServerHandler handler; + + void setHandler(NettyServerHandler handler) { + this.handler = handler; + } + + @Override + public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, + boolean endOfStream) throws Http2Exception { + handler.onDataRead(streamId, data, endOfStream); + } + + @Override + public void onHeadersRead(ChannelHandlerContext ctx, + int streamId, + Http2Headers headers, + int streamDependency, + short weight, + boolean exclusive, + int padding, + boolean endStream) throws Http2Exception { + handler.onHeadersRead(ctx, streamId, headers); + } + + @Override + public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) + throws Http2Exception { + handler.onRstStreamRead(streamId); + } + } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java index 2548ce28e9..0476c9db68 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java @@ -1,5 +1,6 @@ package com.google.net.stubby.newtransport.okhttp; +import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.SerializingExecutor; import com.google.net.stubby.Status; @@ -12,6 +13,7 @@ import okio.Buffer; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; class AsyncFrameWriter implements FrameWriter { @@ -177,12 +179,28 @@ class AsyncFrameWriter implements FrameWriter { @Override public void close() { - executor.execute(new WriteRunnable() { + // Wait for the frameWriter to close. + final SettableFuture closeFuture = SettableFuture.create(); + executor.execute(new Runnable() { @Override - public void doRun() throws IOException { - frameWriter.close(); + public void run() { + try { + frameWriter.close(); + } catch (IOException e) { + closeFuture.setException(e); + } finally { + closeFuture.set(null); + } } }); + try { + closeFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } private abstract class WriteRunnable implements Runnable { diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java index 987bca406f..b30c9dde71 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java @@ -100,6 +100,8 @@ public class OkHttpClientTransport extends AbstractClientTransport { private boolean goAway; @GuardedBy("lock") private Status goAwayStatus; + @GuardedBy("lock") + private boolean stopped; OkHttpClientTransport(InetSocketAddress address, Executor executor) { this.address = Preconditions.checkNotNull(address); @@ -149,9 +151,9 @@ public class OkHttpClientTransport extends AbstractClientTransport { frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor); } - notifyStarted(); clientFrameHandler = new ClientFrameHandler(); executor.execute(clientFrameHandler); + notifyStarted(); } @Override @@ -161,10 +163,11 @@ public class OkHttpClientTransport extends AbstractClientTransport { normalClose = !goAway; } if (normalClose) { - abort(Status.INTERNAL.withDescription("Transport stopped")); // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams. // The GOAWAY is part of graceful shutdown. frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); + + abort(Status.INTERNAL.withDescription("Transport stopped")); } stopIfNecessary(); } @@ -203,7 +206,10 @@ public class OkHttpClientTransport extends AbstractClientTransport { // Starting stop, go into STOPPING state so that Channel know this Transport should not be used // further, will become STOPPED once all streams are complete. - stopAsync(); + State state = state(); + if (state == State.RUNNING || state == State.NEW) { + stopAsync(); + } for (OkHttpClientStream stream : goAwayStreams) { stream.setStatus(status, new Metadata.Trailers()); @@ -233,8 +239,16 @@ public class OkHttpClientTransport extends AbstractClientTransport { boolean shouldStop; synchronized (lock) { shouldStop = (goAway && streams.size() == 0); + if (shouldStop) { + if (stopped) { + // We've already stopped, don't stop again. + shouldStop = false; + } + stopped = true; + } } if (shouldStop) { + // Wait for the frame writer to close. frameWriter.close(); try { frameReader.close();