diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java index 925adb7159..ab7884a877 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java @@ -72,7 +72,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { @Override public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, - boolean endOfStream, boolean endOfSegment) + boolean endOfStream) throws Http2Exception { Request request = requestRegistry.lookup(streamId); if (request == null) { @@ -99,7 +99,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { @Override public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, - boolean endStream, boolean endSegment) throws Http2Exception { + boolean endStream) throws Http2Exception { Request operation = requestRegistry.lookup(streamId); if (operation == null) { if (isClient()) { @@ -206,7 +206,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { } // Use Path to specify the operation String operationName = - normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value())); + normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value())); if (operationName == null) { return null; } @@ -264,25 +264,24 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler { this.ctx = ctx; } - public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream, - boolean endSegment, boolean compressed) { + public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) { return Http2Codec.this.writeData(ctx, ctx.newPromise(), - streamId, data, PADDING, endStream, endSegment, compressed); + streamId, data, PADDING, endStream); } public ChannelFuture writeHeaders(int streamId, Http2Headers headers, - boolean endStream, boolean endSegment) { + boolean endStream) { return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId, - headers, PADDING, endStream, endSegment); + headers, PADDING, endStream); } public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, - boolean endStream, boolean endSegment) { + boolean endStream) { return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId, - headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment); + headers, streamDependency, weight, exclusive, PADDING, endStream); } public ChannelFuture writeRstStream(int streamId, long errorCode) { diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java index 6e4c82db4d..9a17a7083f 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java @@ -56,7 +56,7 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink { try { ChannelFuture channelFuture = writer.writeData(getId(), - Unpooled.wrappedBuffer(frame), closed, closed, false); + Unpooled.wrappedBuffer(frame), closed); if (!closed) { // Sync for all except the last frame to prevent buffer corruption. channelFuture.get(); diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java index e14fdd728e..0d6d2d5c0c 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java @@ -42,7 +42,7 @@ class Http2Request extends Http2Operation implements Request { .authority(HOST_NAME) .scheme("https") .add("content-type", Http2Session.PROTORPC); - writer.writeHeaders(response.getId(), headersBuilder.build(), false, true); + writer.writeHeaders(response.getId(), headersBuilder.build(), false); this.response = response; } diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java index 818e8bcc10..1a961269fb 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java @@ -31,6 +31,6 @@ class Http2Response extends Http2Operation implements Response { super(id, writer, framer); Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200") .add("content-type", Http2Session.PROTORPC).build(); - writer.writeHeaders(id, headers, false, true); + writer.writeHeaders(id, headers, false); } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java index 0a046dac69..7cedfbba16 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/Http2Negotiator.java @@ -28,7 +28,6 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLEngine; @@ -74,16 +73,22 @@ public class Http2Negotiator { if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) { throw new IllegalStateException("NPN/ALPN extensions not installed"); } - final CountDownLatch sslCompletion = new CountDownLatch(1); final ChannelInitializer initializer = new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(final SocketChannel ch) throws Exception { SslHandler sslHandler = new SslHandler(sslEngine, false); sslHandler.handshakeFuture().addListener( new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { - sslCompletion.countDown(); + if (!future.isSuccess()) { + // Throw the exception. + if (tlsNegotiatedHttp2.isDone()) { + tlsNegotiatedHttp2.get(); + } else { + future.get(); + } + } } }); ch.pipeline().addLast(sslHandler); @@ -100,10 +105,6 @@ public class Http2Negotiator { @Override public void await(Channel channel) { try { - // Wait for SSL negotiation to complete - if (!sslCompletion.await(20, TimeUnit.SECONDS)) { - throw new IllegalStateException("Failed to negotiate TLS"); - } // Wait for NPN/ALPN negotation to complete. Will throw if failed. tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS); } catch (Exception e) { @@ -268,15 +269,15 @@ public class Http2Negotiator { case "unsupported": // both removeMethod.invoke(null, engine); - protocolNegotiated.setException( - new IllegalStateException("ALPN/NPN not supported by server")); + protocolNegotiated.setException(new IllegalStateException( + "ALPN/NPN protocol " + HTTP_VERSION_NAME + " not supported by server")); return null; case "protocols": // ALPN only return ImmutableList.of(HTTP_VERSION_NAME); case "selected": // ALPN only - // Only 'supports' one protocol so we know what was 'selected. + // Only 'supports' one protocol so we know what was selected. removeMethod.invoke(null, engine); protocolNegotiated.set(null); return null; diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java index 7a831494f9..254118d69e 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java @@ -121,8 +121,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { short weight, boolean exclusive, int padding, - boolean endStream, - boolean endSegment) throws Http2Exception { + boolean endStream) throws Http2Exception { // TODO(user): Assuming that all headers fit in a single HEADERS frame. NettyClientStream stream = clientStream(connection().requireStream(streamId)); stream.inboundHeadersRecieved(headers, endStream); @@ -136,8 +135,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { int streamId, ByteBuf data, int padding, - boolean endOfStream, - boolean endOfSegment) throws Http2Exception { + boolean endOfStream) throws Http2Exception { NettyClientStream stream = clientStream(connection().requireStream(streamId)); // TODO(user): update flow controller to use a promise. @@ -260,9 +258,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { stream.id(), cmd.content(), 0, - cmd.endStream(), - cmd.endSegment(), - false); + cmd.endStream()); } /** @@ -337,7 +333,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler { .add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC) .path("/" + pendingStream.method.getName()) .build(); - writeHeaders(ctx(), ctx().newPromise(), streamId, headersBuilder.build(), 0, false, false) + writeHeaders(ctx(), ctx().newPromise(), streamId, headersBuilder.build(), 0, false) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java index 53721337ac..fc15c8de92 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java @@ -107,8 +107,7 @@ class NettyClientStream extends AbstractStream implements ClientStream { @Override protected void sendFrame(ByteBuffer frame, boolean endOfStream) { - SendGrpcFrameCommand cmd = - new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream, endOfStream); + SendGrpcFrameCommand cmd = new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream); channel.writeAndFlush(cmd); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/SendGrpcFrameCommand.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/SendGrpcFrameCommand.java index d61f862425..7c348b8687 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/SendGrpcFrameCommand.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/SendGrpcFrameCommand.java @@ -12,14 +12,11 @@ import io.netty.buffer.DefaultByteBufHolder; class SendGrpcFrameCommand extends DefaultByteBufHolder { private final NettyClientStream stream; private final boolean endStream; - private final boolean endSegment; - SendGrpcFrameCommand(NettyClientStream stream, ByteBuf content, boolean endStream, - boolean endSegment) { + SendGrpcFrameCommand(NettyClientStream stream, ByteBuf content, boolean endStream) { super(content); this.stream = Preconditions.checkNotNull(stream, "stream"); this.endStream = endStream; - this.endSegment = endSegment; } NettyClientStream stream() { @@ -30,18 +27,14 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder { return endStream; } - boolean endSegment() { - return endSegment; - } - @Override public ByteBufHolder copy() { - return new SendGrpcFrameCommand(stream, content().copy(), endStream, endSegment); + return new SendGrpcFrameCommand(stream, content().copy(), endStream); } @Override public ByteBufHolder duplicate() { - return new SendGrpcFrameCommand(stream, content().duplicate(), endStream, endSegment); + return new SendGrpcFrameCommand(stream, content().duplicate(), endStream); } @Override diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java index 39dfe02f45..528c343292 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientHandlerTest.java @@ -133,7 +133,6 @@ public class NettyClientHandlerTest { eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), - eq(false), eq(false)); Http2Headers headers = captor.getValue(); assertEquals("https", headers.scheme()); @@ -166,7 +165,7 @@ public class NettyClientHandlerTest { createStream(); // Send a frame and verify that it was written. - handler.write(ctx, new SendGrpcFrameCommand(stream, content, true, true), promise); + handler.write(ctx, new SendGrpcFrameCommand(stream, content, true), promise); verify(promise, never()).setFailure(any(Throwable.class)); verify(ctx).writeAndFlush(any(ByteBuf.class), eq(promise)); } @@ -174,7 +173,7 @@ public class NettyClientHandlerTest { @Test public void sendForUnknownStreamShouldFail() throws Exception { when(stream.id()).thenReturn(3); - handler.write(ctx, new SendGrpcFrameCommand(stream, content, true, true), promise); + handler.write(ctx, new SendGrpcFrameCommand(stream, content, true), promise); verify(promise).setFailure(any(Throwable.class)); } @@ -215,7 +214,6 @@ public class NettyClientHandlerTest { eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), - eq(false), eq(false)); } @@ -251,7 +249,7 @@ public class NettyClientHandlerTest { private ByteBuf headersFrame(int streamId, Http2Headers headers) { ChannelHandlerContext ctx = newContext(); - frameWriter.writeHeaders(ctx, promise, streamId, headers, 0, false, false); + frameWriter.writeHeaders(ctx, promise, streamId, headers, 0, false); return captureWrite(ctx); } @@ -259,7 +257,7 @@ public class NettyClientHandlerTest { // Need to retain the content since the frameWriter releases it. content.retain(); ChannelHandlerContext ctx = newContext(); - frameWriter.writeData(ctx, newPromise(), streamId, content, 0, endStream, false); + frameWriter.writeData(ctx, newPromise(), streamId, content, 0, endStream); return captureWrite(ctx); }