Upgrade GRPC Java/Netty to HTTP/2 draft 14.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=73648766
This commit is contained in:
nathanmittler 2014-08-19 12:26:11 -07:00 committed by Eric Anderson
parent 34aede347a
commit 42af07c64d
9 changed files with 36 additions and 50 deletions

View File

@ -72,7 +72,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
@Override @Override
public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, public void onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream, boolean endOfSegment) boolean endOfStream)
throws Http2Exception { throws Http2Exception {
Request request = requestRegistry.lookup(streamId); Request request = requestRegistry.lookup(streamId);
if (request == null) { if (request == null) {
@ -99,7 +99,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive, int padding, int streamDependency, short weight, boolean exclusive, int padding,
boolean endStream, boolean endSegment) throws Http2Exception { boolean endStream) throws Http2Exception {
Request operation = requestRegistry.lookup(streamId); Request operation = requestRegistry.lookup(streamId);
if (operation == null) { if (operation == null) {
if (isClient()) { if (isClient()) {
@ -206,7 +206,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
} }
// Use Path to specify the operation // Use Path to specify the operation
String operationName = String operationName =
normalizeOperationName(headers.get(Http2Headers.HttpName.PATH.value())); normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()));
if (operationName == null) { if (operationName == null) {
return null; return null;
} }
@ -264,25 +264,24 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
this.ctx = ctx; this.ctx = ctx;
} }
public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream, public ChannelFuture writeData(int streamId, ByteBuf data, boolean endStream) {
boolean endSegment, boolean compressed) {
return Http2Codec.this.writeData(ctx, ctx.newPromise(), return Http2Codec.this.writeData(ctx, ctx.newPromise(),
streamId, data, PADDING, endStream, endSegment, compressed); streamId, data, PADDING, endStream);
} }
public ChannelFuture writeHeaders(int streamId, public ChannelFuture writeHeaders(int streamId,
Http2Headers headers, Http2Headers headers,
boolean endStream, boolean endSegment) { boolean endStream) {
return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId, return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
headers, PADDING, endStream, endSegment); headers, PADDING, endStream);
} }
public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency, public ChannelFuture writeHeaders(int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, short weight, boolean exclusive,
boolean endStream, boolean endSegment) { boolean endStream) {
return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId, return Http2Codec.this.writeHeaders(ctx, ctx.newPromise(), streamId,
headers, streamDependency, weight, exclusive, PADDING, endStream, endSegment); headers, streamDependency, weight, exclusive, PADDING, endStream);
} }
public ChannelFuture writeRstStream(int streamId, long errorCode) { public ChannelFuture writeRstStream(int streamId, long errorCode) {

View File

@ -56,7 +56,7 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
try { try {
ChannelFuture channelFuture = writer.writeData(getId(), ChannelFuture channelFuture = writer.writeData(getId(),
Unpooled.wrappedBuffer(frame), closed, closed, false); Unpooled.wrappedBuffer(frame), closed);
if (!closed) { if (!closed) {
// Sync for all except the last frame to prevent buffer corruption. // Sync for all except the last frame to prevent buffer corruption.
channelFuture.get(); channelFuture.get();

View File

@ -42,7 +42,7 @@ class Http2Request extends Http2Operation implements Request {
.authority(HOST_NAME) .authority(HOST_NAME)
.scheme("https") .scheme("https")
.add("content-type", Http2Session.PROTORPC); .add("content-type", Http2Session.PROTORPC);
writer.writeHeaders(response.getId(), headersBuilder.build(), false, true); writer.writeHeaders(response.getId(), headersBuilder.build(), false);
this.response = response; this.response = response;
} }

View File

@ -31,6 +31,6 @@ class Http2Response extends Http2Operation implements Response {
super(id, writer, framer); super(id, writer, framer);
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200") Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200")
.add("content-type", Http2Session.PROTORPC).build(); .add("content-type", Http2Session.PROTORPC).build();
writer.writeHeaders(id, headers, false, true); writer.writeHeaders(id, headers, false);
} }
} }

View File

@ -28,7 +28,6 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -74,16 +73,22 @@ public class Http2Negotiator {
if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) { if (!installJettyTLSProtocolSelection(sslEngine, tlsNegotiatedHttp2)) {
throw new IllegalStateException("NPN/ALPN extensions not installed"); throw new IllegalStateException("NPN/ALPN extensions not installed");
} }
final CountDownLatch sslCompletion = new CountDownLatch(1);
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(final SocketChannel ch) throws Exception {
SslHandler sslHandler = new SslHandler(sslEngine, false); SslHandler sslHandler = new SslHandler(sslEngine, false);
sslHandler.handshakeFuture().addListener( sslHandler.handshakeFuture().addListener(
new GenericFutureListener<Future<? super Channel>>() { new GenericFutureListener<Future<? super Channel>>() {
@Override @Override
public void operationComplete(Future<? super Channel> future) throws Exception { public void operationComplete(Future<? super Channel> future) throws Exception {
sslCompletion.countDown(); if (!future.isSuccess()) {
// Throw the exception.
if (tlsNegotiatedHttp2.isDone()) {
tlsNegotiatedHttp2.get();
} else {
future.get();
}
}
} }
}); });
ch.pipeline().addLast(sslHandler); ch.pipeline().addLast(sslHandler);
@ -100,10 +105,6 @@ public class Http2Negotiator {
@Override @Override
public void await(Channel channel) { public void await(Channel channel) {
try { try {
// Wait for SSL negotiation to complete
if (!sslCompletion.await(20, TimeUnit.SECONDS)) {
throw new IllegalStateException("Failed to negotiate TLS");
}
// Wait for NPN/ALPN negotation to complete. Will throw if failed. // Wait for NPN/ALPN negotation to complete. Will throw if failed.
tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS); tlsNegotiatedHttp2.get(5, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
@ -268,15 +269,15 @@ public class Http2Negotiator {
case "unsupported": case "unsupported":
// both // both
removeMethod.invoke(null, engine); removeMethod.invoke(null, engine);
protocolNegotiated.setException( protocolNegotiated.setException(new IllegalStateException(
new IllegalStateException("ALPN/NPN not supported by server")); "ALPN/NPN protocol " + HTTP_VERSION_NAME + " not supported by server"));
return null; return null;
case "protocols": case "protocols":
// ALPN only // ALPN only
return ImmutableList.of(HTTP_VERSION_NAME); return ImmutableList.of(HTTP_VERSION_NAME);
case "selected": case "selected":
// ALPN only // ALPN only
// Only 'supports' one protocol so we know what was 'selected. // Only 'supports' one protocol so we know what was selected.
removeMethod.invoke(null, engine); removeMethod.invoke(null, engine);
protocolNegotiated.set(null); protocolNegotiated.set(null);
return null; return null;

View File

@ -121,8 +121,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
short weight, short weight,
boolean exclusive, boolean exclusive,
int padding, int padding,
boolean endStream, boolean endStream) throws Http2Exception {
boolean endSegment) throws Http2Exception {
// TODO(user): Assuming that all headers fit in a single HEADERS frame. // TODO(user): Assuming that all headers fit in a single HEADERS frame.
NettyClientStream stream = clientStream(connection().requireStream(streamId)); NettyClientStream stream = clientStream(connection().requireStream(streamId));
stream.inboundHeadersRecieved(headers, endStream); stream.inboundHeadersRecieved(headers, endStream);
@ -136,8 +135,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
int streamId, int streamId,
ByteBuf data, ByteBuf data,
int padding, int padding,
boolean endOfStream, boolean endOfStream) throws Http2Exception {
boolean endOfSegment) throws Http2Exception {
NettyClientStream stream = clientStream(connection().requireStream(streamId)); NettyClientStream stream = clientStream(connection().requireStream(streamId));
// TODO(user): update flow controller to use a promise. // TODO(user): update flow controller to use a promise.
@ -260,9 +258,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
stream.id(), stream.id(),
cmd.content(), cmd.content(),
0, 0,
cmd.endStream(), cmd.endStream());
cmd.endSegment(),
false);
} }
/** /**
@ -337,7 +333,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC) .add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC)
.path("/" + pendingStream.method.getName()) .path("/" + pendingStream.method.getName())
.build(); .build();
writeHeaders(ctx(), ctx().newPromise(), streamId, headersBuilder.build(), 0, false, false) writeHeaders(ctx(), ctx().newPromise(), streamId, headersBuilder.build(), 0, false)
.addListener(new ChannelFutureListener() { .addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {

View File

@ -107,8 +107,7 @@ class NettyClientStream extends AbstractStream implements ClientStream {
@Override @Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) { protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
SendGrpcFrameCommand cmd = SendGrpcFrameCommand cmd = new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream);
new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream, endOfStream);
channel.writeAndFlush(cmd); channel.writeAndFlush(cmd);
} }

View File

@ -12,14 +12,11 @@ import io.netty.buffer.DefaultByteBufHolder;
class SendGrpcFrameCommand extends DefaultByteBufHolder { class SendGrpcFrameCommand extends DefaultByteBufHolder {
private final NettyClientStream stream; private final NettyClientStream stream;
private final boolean endStream; private final boolean endStream;
private final boolean endSegment;
SendGrpcFrameCommand(NettyClientStream stream, ByteBuf content, boolean endStream, SendGrpcFrameCommand(NettyClientStream stream, ByteBuf content, boolean endStream) {
boolean endSegment) {
super(content); super(content);
this.stream = Preconditions.checkNotNull(stream, "stream"); this.stream = Preconditions.checkNotNull(stream, "stream");
this.endStream = endStream; this.endStream = endStream;
this.endSegment = endSegment;
} }
NettyClientStream stream() { NettyClientStream stream() {
@ -30,18 +27,14 @@ class SendGrpcFrameCommand extends DefaultByteBufHolder {
return endStream; return endStream;
} }
boolean endSegment() {
return endSegment;
}
@Override @Override
public ByteBufHolder copy() { public ByteBufHolder copy() {
return new SendGrpcFrameCommand(stream, content().copy(), endStream, endSegment); return new SendGrpcFrameCommand(stream, content().copy(), endStream);
} }
@Override @Override
public ByteBufHolder duplicate() { public ByteBufHolder duplicate() {
return new SendGrpcFrameCommand(stream, content().duplicate(), endStream, endSegment); return new SendGrpcFrameCommand(stream, content().duplicate(), endStream);
} }
@Override @Override

View File

@ -133,7 +133,6 @@ public class NettyClientHandlerTest {
eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT), eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(false),
eq(0), eq(0),
eq(false),
eq(false)); eq(false));
Http2Headers headers = captor.getValue(); Http2Headers headers = captor.getValue();
assertEquals("https", headers.scheme()); assertEquals("https", headers.scheme());
@ -166,7 +165,7 @@ public class NettyClientHandlerTest {
createStream(); createStream();
// Send a frame and verify that it was written. // Send a frame and verify that it was written.
handler.write(ctx, new SendGrpcFrameCommand(stream, content, true, true), promise); handler.write(ctx, new SendGrpcFrameCommand(stream, content, true), promise);
verify(promise, never()).setFailure(any(Throwable.class)); verify(promise, never()).setFailure(any(Throwable.class));
verify(ctx).writeAndFlush(any(ByteBuf.class), eq(promise)); verify(ctx).writeAndFlush(any(ByteBuf.class), eq(promise));
} }
@ -174,7 +173,7 @@ public class NettyClientHandlerTest {
@Test @Test
public void sendForUnknownStreamShouldFail() throws Exception { public void sendForUnknownStreamShouldFail() throws Exception {
when(stream.id()).thenReturn(3); when(stream.id()).thenReturn(3);
handler.write(ctx, new SendGrpcFrameCommand(stream, content, true, true), promise); handler.write(ctx, new SendGrpcFrameCommand(stream, content, true), promise);
verify(promise).setFailure(any(Throwable.class)); verify(promise).setFailure(any(Throwable.class));
} }
@ -215,7 +214,6 @@ public class NettyClientHandlerTest {
eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT), eq(Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT),
eq(false), eq(false),
eq(0), eq(0),
eq(false),
eq(false)); eq(false));
} }
@ -251,7 +249,7 @@ public class NettyClientHandlerTest {
private ByteBuf headersFrame(int streamId, Http2Headers headers) { private ByteBuf headersFrame(int streamId, Http2Headers headers) {
ChannelHandlerContext ctx = newContext(); ChannelHandlerContext ctx = newContext();
frameWriter.writeHeaders(ctx, promise, streamId, headers, 0, false, false); frameWriter.writeHeaders(ctx, promise, streamId, headers, 0, false);
return captureWrite(ctx); return captureWrite(ctx);
} }
@ -259,7 +257,7 @@ public class NettyClientHandlerTest {
// Need to retain the content since the frameWriter releases it. // Need to retain the content since the frameWriter releases it.
content.retain(); content.retain();
ChannelHandlerContext ctx = newContext(); ChannelHandlerContext ctx = newContext();
frameWriter.writeData(ctx, newPromise(), streamId, content, 0, endStream, false); frameWriter.writeData(ctx, newPromise(), streamId, content, 0, endStream);
return captureWrite(ctx); return captureWrite(ctx);
} }