Upgrading to Netty 4.1.0.Beta8

A few things to note:

- ByteString has gone away in favor of AsciiString.

- Http2Headers now uses CharSequence for all methods, so there are a few places that we have to explicitly check for AsciiString to get the optimizations.

- We now have to specify a graceful shutdown timeout for our Netty handlers. Using 5 seconds.
This commit is contained in:
nmittler 2015-11-13 13:32:08 -08:00
parent 088def1985
commit ebed5a624a
12 changed files with 86 additions and 94 deletions

View File

@ -57,7 +57,7 @@ In Maven, you can use the [os-maven-plugin](https://github.com/trustin/os-maven-
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative</artifactId>
<version>1.1.33.Fork7</version>
<version>1.1.33.Fork9</version>
<classifier>${tcnative.classifier}</classifier>
</dependency>
</dependencies>
@ -125,7 +125,7 @@ if (osdetector.os == "linux" && osdetector.release.isLike("fedora")) {
}
dependencies {
compile 'io.netty:netty-tcnative:1.1.33.Fork7:' + tcnative_classifier
compile 'io.netty:netty-tcnative:1.1.33.Fork9:' + tcnative_classifier
}
```

View File

@ -38,7 +38,7 @@ dependencies {
libraries.mockito,
libraries.hdrhistogram,
libraries.netty_tcnative,
libraries.netty_transport_native_epoll
libraries.netty_epoll
}
configureProtoCompilation()

View File

@ -144,9 +144,9 @@ subprojects {
protobuf_nano: "com.google.protobuf.nano:protobuf-javanano:${protobufNanoVersion}",
protobuf_plugin: 'com.google.protobuf:protobuf-gradle-plugin:0.7.0',
netty: 'io.netty:netty-codec-http2:4.1.0.Beta6',
netty_tcnative: 'io.netty:netty-tcnative:1.1.33.Fork7:' + tcnative_suffix,
netty_transport_native_epoll: 'io.netty:netty-transport-native-epoll:4.1.0.Beta6' + epoll_suffix,
netty: 'io.netty:netty-codec-http2:4.1.0.Beta8',
netty_epoll: 'io.netty:netty-transport-native-epoll:4.1.0.Beta8' + epoll_suffix,
netty_tcnative: 'io.netty:netty-tcnative:1.1.33.Fork9:' + tcnative_suffix,
// Test dependencies.
junit: 'junit:junit:4.11',

View File

@ -32,6 +32,8 @@
package io.grpc.netty;
import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
@ -47,7 +49,7 @@ import io.netty.handler.codec.http2.Http2Stream;
* shutdown the connection) as well as sending the initial connection window at startup.
*/
abstract class AbstractNettyHandler extends Http2ConnectionHandler {
private static long GRACEFUL_SHUTDOWN_TIMEOUT = MILLISECONDS.convert(5, SECONDS);
private int initialConnectionWindow;
private ChannelHandlerContext ctx;
@ -56,6 +58,9 @@ abstract class AbstractNettyHandler extends Http2ConnectionHandler {
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
// Set the timeout for graceful shutdown.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_TIMEOUT);
// TODO(nmittler): Use auto-refill once https://github.com/grpc/grpc-java/issues/1175 is fixed.
this.initialConnectionWindow = Integer.MAX_VALUE;
}
@ -81,7 +86,8 @@ abstract class AbstractNettyHandler extends Http2ConnectionHandler {
if (embedded == null) {
// Kill the connection instead of propagating the exceptionCaught(). Http2ConnectionHandler
// only handles Http2Exceptions and propagates everything else.
cause = Http2Exception.connectionError(Http2Error.INTERNAL_ERROR, cause, cause.getMessage());
String message = cause.getMessage() == null ? "Unknown error occurred" : cause.getMessage();
cause = Http2Exception.connectionError(Http2Error.INTERNAL_ERROR, cause, message);
}
super.exceptionCaught(ctx, cause);
}

View File

@ -105,12 +105,13 @@ class NettyClientHandler extends AbstractNettyHandler {
@VisibleForTesting
NettyClientHandler(BufferingHttp2ConnectionEncoder encoder, Http2Connection connection,
Http2FrameReader frameReader, int flowControlWindow, Ticker ticker) {
super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader,
new LazyFrameListener()), encoder, createInitialSettings(flowControlWindow));
Http2FrameReader frameReader, int flowControlWindow, Ticker ticker) {
super(new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader), encoder,
createInitialSettings(flowControlWindow));
this.ticker = ticker;
initListener();
// Set the frame listener on the decoder.
decoder().frameListener(new FrameListener());
streamKey = connection.newKey();
@ -180,10 +181,6 @@ class NettyClientHandler extends AbstractNettyHandler {
}
}
private void initListener() {
((LazyFrameListener) decoder().listener()).setHandler(this);
}
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream)
throws Http2Exception {
NettyClientStream stream = clientStream(requireHttp2Stream(streamId));
@ -476,17 +473,11 @@ class NettyClientHandler extends AbstractNettyHandler {
return stream;
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyClientHandler handler;
void setHandler(NettyClientHandler handler) {
this.handler = handler;
}
private class FrameListener extends Http2FrameAdapter {
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
handler.onDataRead(streamId, data, endOfStream);
NettyClientHandler.this.onDataRead(streamId, data, endOfStream);
return padding;
}
@ -499,23 +490,23 @@ class NettyClientHandler extends AbstractNettyHandler {
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
handler.onHeadersRead(streamId, headers, endStream);
NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
handler.onRstStreamRead(streamId, errorCode);
NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
}
@Override public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data)
throws Http2Exception {
Http2Ping p = handler.ping;
Http2Ping p = ping;
if (p != null) {
long ackPayload = data.readLong();
if (p.payload() == ackPayload) {
p.complete();
handler.ping = null;
ping = null;
} else {
logger.log(Level.WARNING, String.format("Received unexpected ping ack. "
+ "Expecting %d, got %d", p.payload(), ackPayload));

View File

@ -31,7 +31,6 @@
package io.grpc.netty;
import static com.google.common.base.Charsets.US_ASCII;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.netty.Utils.CONTENT_TYPE_GRPC;
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
@ -65,7 +64,6 @@ import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.util.ByteString;
import io.netty.util.ReferenceCountUtil;
import java.util.logging.Level;
@ -78,7 +76,6 @@ import javax.annotation.Nullable;
* the context of the Netty Channel thread.
*/
class NettyServerHandler extends AbstractNettyHandler {
private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
@ -98,21 +95,21 @@ class NettyServerHandler extends AbstractNettyHandler {
int flowControlWindow,
int maxMessageSize) {
this(transportListener, new DefaultHttp2ConnectionEncoder(connection, frameWriter), frameReader,
createInitialSettings(flowControlWindow, maxStreams), maxMessageSize);
createInitialSettings(flowControlWindow, maxStreams), maxMessageSize);
}
private NettyServerHandler(ServerTransportListener transportListener,
Http2ConnectionEncoder encoder,
Http2FrameReader frameReader, Http2Settings settings,
int maxMessageSize) {
super(new DefaultHttp2ConnectionDecoder(encoder.connection(), encoder, frameReader,
new LazyFrameListener()), encoder, settings);
super(new DefaultHttp2ConnectionDecoder(encoder.connection(), encoder, frameReader), encoder,
settings);
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize;
streamKey = encoder.connection().newKey();
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
initListener();
decoder().frameListener(new FrameListener());
}
private static Http2Settings createInitialSettings(int flowControlWindow, int maxStreams) {
@ -128,10 +125,6 @@ class NettyServerHandler extends AbstractNettyHandler {
return connectionError;
}
private void initListener() {
((LazyFrameListener) decoder().listener()).setHandler(this);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
serverWriteQueue = new WriteQueue(ctx.channel());
@ -333,12 +326,12 @@ class NettyServerHandler extends AbstractNettyHandler {
}
private void verifyContentType(int streamId, Http2Headers headers) throws Http2Exception {
ByteString contentType = headers.get(CONTENT_TYPE_HEADER);
CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
if (contentType == null) {
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Content-Type is missing from the request");
}
String contentTypeString = contentType.toString(US_ASCII);
String contentTypeString = contentType.toString();
if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Content-Type '%s' is not supported", contentTypeString);
@ -361,16 +354,18 @@ class NettyServerHandler extends AbstractNettyHandler {
}
checkHeader(streamId, headers, CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
// Remove the leading slash of the path and get the fully qualified method name
ByteString path = headers.path();
if (path.byteAt(0) != '/') {
CharSequence path = headers.path();
if (path.charAt(0) != '/') {
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Malformatted path: %s", path);
}
return path.toString(1, path.length());
return path.subSequence(1, path.length()).toString();
}
private static void checkHeader(int streamId, Http2Headers headers,
ByteString header, ByteString expectedValue) throws Http2Exception {
private static void checkHeader(int streamId,
Http2Headers headers,
CharSequence header,
CharSequence expectedValue) throws Http2Exception {
if (!expectedValue.equals(headers.get(header))) {
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue);
@ -389,17 +384,12 @@ class NettyServerHandler extends AbstractNettyHandler {
streamId, Http2Error.INTERNAL_ERROR, cause, cause.getMessage());
}
private static class LazyFrameListener extends Http2FrameAdapter {
private NettyServerHandler handler;
void setHandler(NettyServerHandler handler) {
this.handler = handler;
}
private class FrameListener extends Http2FrameAdapter {
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
handler.onDataRead(streamId, data, endOfStream);
NettyServerHandler.this.onDataRead(streamId, data, endOfStream);
return padding;
}
@ -412,13 +402,13 @@ class NettyServerHandler extends AbstractNettyHandler {
boolean exclusive,
int padding,
boolean endStream) throws Http2Exception {
handler.onHeadersRead(ctx, streamId, headers);
NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
}
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
handler.onRstStreamRead(streamId);
NettyServerHandler.this.onRstStreamRead(streamId);
}
}
}

View File

@ -34,7 +34,7 @@ package io.grpc.netty;
import io.grpc.Internal;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.util.ByteString;
import io.netty.util.AsciiString;
/**
* A class that provides a Netty handler to control protocol negotiation.
@ -49,7 +49,7 @@ public interface ProtocolNegotiator {
/**
* The HTTP/2 scheme to be used when sending {@code HEADERS}.
*/
ByteString scheme();
AsciiString scheme();
}
/**

View File

@ -59,7 +59,7 @@ import io.netty.handler.ssl.OpenSslEngine;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.ByteString;
import io.netty.util.AsciiString;
import java.net.URI;
import java.util.ArrayDeque;
@ -108,7 +108,7 @@ public final class ProtocolNegotiators {
}
@Override
public ByteString scheme() {
public AsciiString scheme() {
return Utils.HTTP;
}
};
@ -183,7 +183,7 @@ public final class ProtocolNegotiators {
}
@Override
public ByteString scheme() {
public AsciiString scheme() {
return Utils.HTTPS;
}
}
@ -488,7 +488,7 @@ public final class ProtocolNegotiators {
}
@Override
public ByteString scheme() {
public AsciiString scheme() {
return Utils.HTTPS;
}
@ -527,7 +527,7 @@ public final class ProtocolNegotiators {
}
@Override
public ByteString scheme() {
public AsciiString scheme() {
return Utils.HTTP;
}
@ -554,7 +554,7 @@ public final class ProtocolNegotiators {
}
@Override
public ByteString scheme() {
public AsciiString scheme() {
return Utils.HTTP;
}

View File

@ -34,6 +34,7 @@ package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
import static io.netty.util.CharsetUtil.US_ASCII;
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.base.Preconditions;
@ -47,7 +48,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.ByteString;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@ -62,18 +63,15 @@ import java.util.concurrent.TimeUnit;
*/
class Utils {
public static final ByteString STATUS_OK = new ByteString("200".getBytes(UTF_8));
public static final ByteString HTTP_METHOD = new ByteString(GrpcUtil.HTTP_METHOD.getBytes(UTF_8));
public static final ByteString HTTPS = new ByteString("https".getBytes(UTF_8));
public static final ByteString HTTP = new ByteString("http".getBytes(UTF_8));
public static final ByteString CONTENT_TYPE_HEADER =
new ByteString(CONTENT_TYPE_KEY.name().getBytes(UTF_8));
public static final ByteString CONTENT_TYPE_GRPC =
new ByteString(GrpcUtil.CONTENT_TYPE_GRPC.getBytes(UTF_8));
public static final ByteString TE_HEADER = new ByteString("te".getBytes(UTF_8));
public static final ByteString TE_TRAILERS = new ByteString(GrpcUtil.TE_TRAILERS.getBytes(UTF_8));
public static final ByteString USER_AGENT =
new ByteString(USER_AGENT_KEY.name().getBytes(UTF_8));
public static final AsciiString STATUS_OK = AsciiString.of("200");
public static final AsciiString HTTP_METHOD = AsciiString.of(GrpcUtil.HTTP_METHOD);
public static final AsciiString HTTPS = AsciiString.of("https");
public static final AsciiString HTTP = AsciiString.of("http");
public static final AsciiString CONTENT_TYPE_HEADER = AsciiString.of(CONTENT_TYPE_KEY.name());
public static final AsciiString CONTENT_TYPE_GRPC = AsciiString.of(GrpcUtil.CONTENT_TYPE_GRPC);
public static final AsciiString TE_HEADER = AsciiString.of("te");
public static final AsciiString TE_TRAILERS = AsciiString.of(GrpcUtil.TE_TRAILERS);
public static final AsciiString USER_AGENT = AsciiString.of(USER_AGENT_KEY.name());
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP =
new DefaultEventLoopGroupResource(1, "grpc-default-boss-ELG");
@ -90,17 +88,26 @@ class Utils {
// arbitrary binary data, not just ASCII.
byte[][] headerValues = new byte[http2Headers.size() * 2][];
int i = 0;
for (Map.Entry<ByteString, ByteString> entry : http2Headers) {
headerValues[i++] = entry.getKey().array();
headerValues[i++] = entry.getValue().array();
for (Map.Entry<CharSequence, CharSequence> entry : http2Headers) {
headerValues[i++] = bytes(entry.getKey());
headerValues[i++] = bytes(entry.getValue());
}
return TransportFrameUtil.toRawSerializedHeaders(headerValues);
}
private static byte[] bytes(CharSequence seq) {
if (seq instanceof AsciiString) {
// Fast path - no copy.
return ((AsciiString) seq).array();
}
// Slow path - copy.
return seq.toString().getBytes(UTF_8);
}
public static Http2Headers convertClientHeaders(Metadata headers,
ByteString scheme,
ByteString defaultPath,
ByteString defaultAuthority) {
AsciiString scheme,
AsciiString defaultPath,
AsciiString defaultAuthority) {
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(defaultAuthority, "defaultAuthority");
// Add any application-provided headers first.
@ -116,12 +123,12 @@ class Utils {
// Override the default authority and path if provided by the headers.
if (headers.containsKey(AUTHORITY_KEY)) {
http2Headers.authority(new ByteString(headers.get(AUTHORITY_KEY).getBytes(UTF_8)));
http2Headers.authority(new AsciiString(headers.get(AUTHORITY_KEY).getBytes(UTF_8)));
}
// Set the User-Agent header.
String userAgent = GrpcUtil.getGrpcUserAgent("netty", headers.get(USER_AGENT_KEY));
http2Headers.set(USER_AGENT, new ByteString(userAgent.getBytes(UTF_8)));
http2Headers.set(USER_AGENT, new AsciiString(userAgent.getBytes(UTF_8)));
return http2Headers;
}
@ -151,8 +158,8 @@ class Utils {
Http2Headers http2Headers = new DefaultHttp2Headers();
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
ByteString name = new ByteString(serializedHeaders[i], false);
ByteString value = new ByteString(serializedHeaders[i + 1], false);
AsciiString name = new AsciiString(serializedHeaders[i], false);
AsciiString value = new AsciiString(serializedHeaders[i + 1], false);
http2Headers.add(name, value);
}

View File

@ -111,9 +111,10 @@ public class BufferingHttp2ConnectionEncoderTest {
new DefaultHttp2ConnectionEncoder(connection, writer);
encoder = new BufferingHttp2ConnectionEncoder(defaultEncoder);
DefaultHttp2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
new DefaultHttp2FrameReader(), frameListener);
new DefaultHttp2FrameReader());
decoder.frameListener(frameListener);
Http2ConnectionHandler handler = new Http2ConnectionHandler(decoder, encoder);
Http2ConnectionHandler handler = new Http2ConnectionHandler.Builder().build(decoder, encoder);
channel = new EmbeddedChannel(handler);
ctx = channel.pipeline().context(handler);
}

View File

@ -55,14 +55,12 @@ import static org.mockito.Mockito.when;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
import io.netty.util.ByteString;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -265,10 +263,10 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
// Set stream id to indicate it has been created
stream().id(STREAM_ID);
Http2Headers headers = new DefaultHttp2Headers().status(STATUS_OK).set(CONTENT_TYPE_HEADER,
new ByteString("application/bad", UTF_8));
new AsciiString("application/bad", UTF_8));
stream().transportHeadersReceived(headers, false);
Http2Headers trailers = new DefaultHttp2Headers()
.set(new ByteString("grpc-status", UTF_8), new ByteString("0", UTF_8));
.set(new AsciiString("grpc-status", UTF_8), new AsciiString("0", UTF_8));
stream().transportHeadersReceived(trailers, true);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.class));

View File

@ -80,7 +80,6 @@ import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString;
import io.netty.util.ByteString;
import org.junit.Before;
import org.junit.Ignore;
@ -278,7 +277,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
public void headersWithInvalidContentTypeShouldFail() throws Exception {
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
.set(CONTENT_TYPE_HEADER, new ByteString("application/bad", UTF_8))
.set(CONTENT_TYPE_HEADER, new AsciiString("application/bad", UTF_8))
.set(TE_HEADER, TE_TRAILERS)
.path(new AsciiString("/foo/bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);