Fixes api drift, addressing inbound flow control.

This commit is contained in:
Adrian Cole 2015-01-22 10:29:30 +08:00 committed by nmittler
parent ada32b0cb1
commit c4a43e6bdc
7 changed files with 51 additions and 66 deletions

@ -1 +1 @@
Subproject commit c9e5238ea61b753f4ebcd84249f853857d8c1eae
Subproject commit 1c6b3307becc3b0c19fdbac6a7058bd731a4db2c

View File

@ -31,6 +31,7 @@
package com.google.net.stubby.transport.netty;
import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
@ -48,10 +49,8 @@ import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;
import java.util.ArrayDeque;
import java.util.Deque;
@ -81,15 +80,17 @@ class NettyClientHandler extends Http2ConnectionHandler {
}
private final Deque<PendingStream> pendingStreams = new ArrayDeque<PendingStream>();
private final Http2LocalFlowController inboundFlow;
private Throwable connectionError;
private ChannelHandlerContext ctx;
public NettyClientHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
Http2LocalFlowController inboundFlow) {
super(connection, frameReader, frameWriter, new LazyFrameListener());
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
initListener();
// Disallow stream creation by the server.
@ -148,7 +149,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
void returnProcessedBytes(int streamId, int bytes) {
try {
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes);
inboundFlow.consumeBytes(ctx, http2Stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
@ -215,7 +216,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2StreamException http2Ex) {
Http2Exception.StreamException http2Ex) {
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream != null) {

View File

@ -56,8 +56,7 @@ import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
@ -65,7 +64,6 @@ import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLogLevel;
@ -250,10 +248,8 @@ class NettyClientTransport extends AbstractClientTransport {
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow);
}
}

View File

@ -56,14 +56,13 @@ import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2InboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamException;
import io.netty.util.ReferenceCountUtil;
import java.util.logging.Level;
@ -82,6 +81,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
private final ServerTransportListener transportListener;
private final Http2LocalFlowController inboundFlow;
private Throwable connectionError;
private ChannelHandlerContext ctx;
private boolean teWarningLogged;
@ -90,10 +90,10 @@ class NettyServerHandler extends Http2ConnectionHandler {
Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
Http2InboundFlowController inboundFlow,
Http2OutboundFlowController outboundFlow) {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener());
Http2LocalFlowController inboundFlow) {
super(connection, frameReader, frameWriter, new LazyFrameListener());
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
initListener();
connection.local().allowPushTo(false);
}
@ -189,9 +189,9 @@ class NettyServerHandler extends Http2ConnectionHandler {
@Override
protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
Http2StreamException http2Ex) {
StreamException http2Ex) {
logger.log(Level.WARNING, "Stream Error", cause);
Http2Stream stream = connection().stream(http2Ex.streamId());
Http2Stream stream = connection().stream(http2Ex.streamId(http2Ex));
if (stream != null) {
// Abort the stream with a status to help the client with debugging.
// Don't need to send a RST_STREAM since the end-of-stream flag will
@ -240,7 +240,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
void returnProcessedBytes(int streamId, int bytes) {
try {
Http2Stream http2Stream = connection().requireStream(streamId);
http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes);
inboundFlow.consumeBytes(ctx, http2Stream, bytes);
} catch (Http2Exception e) {
throw new RuntimeException(e);
}
@ -305,25 +305,25 @@ class NettyServerHandler extends Http2ConnectionHandler {
});
}
private String determineMethod(int streamId, Http2Headers headers) throws Http2StreamException {
private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception {
if (!HTTP_METHOD.equals(headers.method())) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM,
String.format("Method '%s' is not supported", headers.method()));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Method '%s' is not supported", headers.method());
}
checkHeader(streamId, headers, CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
String methodName = TransportFrameUtil.getFullMethodNameFromPath(headers.path().toString());
if (methodName == null) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM,
String.format("Malformatted path: %s", headers.path()));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Malformatted path: %s", headers.path());
}
return methodName;
}
private static void checkHeader(int streamId, Http2Headers headers,
AsciiString header, AsciiString expectedValue) throws Http2StreamException {
AsciiString header, AsciiString expectedValue) throws Http2Exception {
if (!expectedValue.equals(headers.get(header))) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM, String.format(
"Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue));
throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM,
"Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue);
}
}
@ -334,8 +334,8 @@ class NettyServerHandler extends Http2ConnectionHandler {
return stream.getProperty(NettyServerStream.class);
}
private Http2StreamException newStreamException(int streamId, Throwable cause) {
return new Http2StreamException(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause);
private Http2Exception newStreamException(int streamId, Throwable cause) {
return Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause);
}
private static class LazyFrameListener extends Http2FrameAdapter {

View File

@ -42,15 +42,13 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
import io.netty.handler.ssl.SslContext;
import io.netty.util.internal.logging.InternalLogLevel;
@ -129,15 +127,12 @@ class NettyServerTransport extends AbstractService {
Http2FrameWriter frameWriter =
new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}
}

View File

@ -60,15 +60,13 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import org.junit.Before;
@ -314,15 +312,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
Http2Connection connection = new DefaultHttp2Connection(false);
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyClientHandler(connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}
private AsciiString as(String string) {

View File

@ -38,7 +38,7 @@ import static com.google.net.stubby.transport.netty.Utils.HTTP_METHOD;
import static com.google.net.stubby.transport.netty.Utils.TE_HEADER;
import static com.google.net.stubby.transport.netty.Utils.TE_TRAILERS;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
@ -68,15 +68,14 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2FrameReader;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2OutboundFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import org.junit.Before;
@ -233,9 +232,10 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
handler.channelRead(ctx, badFrame());
// Verify the expected GO_AWAY frame was written.
Exception e = protocolError("Frame length 0 incorrect size for ping.");
Exception e = connectionError(Http2Error.PROTOCOL_ERROR,
"Frame length 0 incorrect size for ping.");
ByteBuf expected =
goAwayFrame(STREAM_ID, (int) Http2Error.PROTOCOL_ERROR.code(), toByteBuf(ctx, e));
goAwayFrame(STREAM_ID, (int) Http2Error.FRAME_SIZE_ERROR.code(), toByteBuf(ctx, e));
ByteBuf actual = captureWrite(ctx);
assertEquals(expected, actual);
@ -264,6 +264,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
.path(new AsciiString("/foo.bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
handler.channelRead(ctx, headersFrame);
ArgumentCaptor<NettyServerStream> streamCaptor =
ArgumentCaptor.forClass(NettyServerStream.class);
@SuppressWarnings("rawtypes")
@ -309,15 +310,12 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
Http2Connection connection = new DefaultHttp2Connection(true);
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
DefaultHttp2InboundFlowController inboundFlow =
new DefaultHttp2InboundFlowController(connection, frameWriter);
Http2OutboundFlowController outboundFlow =
new DefaultHttp2OutboundFlowController(connection, frameWriter);
DefaultHttp2LocalFlowController inboundFlow =
new DefaultHttp2LocalFlowController(connection, frameWriter);
return new NettyServerHandler(transportListener,
connection,
frameReader,
frameWriter,
inboundFlow,
outboundFlow);
inboundFlow);
}
}