Updating gRPC code to the latest Netty version which supports binary headers.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=75853353
This commit is contained in:
nathanmittler 2014-09-18 16:10:01 -07:00 committed by Eric Anderson
parent ef2129c7d2
commit fc7a052c07
13 changed files with 153 additions and 130 deletions

View File

@ -15,6 +15,7 @@ import com.google.net.stubby.transport.Transport.Code;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.Http2Connection;
@ -23,12 +24,13 @@ import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import java.util.Map;
/**
* Codec used by clients and servers to interpret HTTP2 frames in the context of an ongoing
* request-response dialog
*/
public class Http2Codec extends AbstractHttp2ConnectionHandler {
public static final int PADDING = 0;
private final RequestRegistry requestRegistry;
private final Session session;
@ -201,16 +203,26 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
* Start the Request operation on the server
*/
private Request serverStart(ChannelHandlerContext ctx, int streamId, Http2Headers headers) {
if (!Http2Session.PROTORPC.equals(headers.get("content-type"))) {
if (!Http2Session.PROTORPC.equals(headers.get(Http2Session.CONTENT_TYPE))) {
return null;
}
// Use Path to specify the operation
String operationName =
normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()));
normalizeOperationName(headers.get(Http2Headers.PseudoHeaderName.PATH.value()).toString());
if (operationName == null) {
return null;
}
Metadata.Headers grpcHeaders = new Metadata.Headers(headers);
// The Netty AsciiString class is really just a wrapper around a byte[] and supports
// arbitrary binary data, not just ASCII.
byte[][] headerValues = new byte[headers.size() * 2][];
int i = 0;
for (Map.Entry<AsciiString, AsciiString> entry : headers) {
headerValues[i++] = entry.getKey().array();
headerValues[i++] = entry.getValue().array();
}
Metadata.Headers grpcHeaders = new Metadata.Headers(headerValues);
// Create the operation and bind a HTTP2 response operation
Request op = session.startRequest(operationName, grpcHeaders,
createResponse(new Http2Writer(ctx), streamId));

View File

@ -5,7 +5,9 @@ import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -14,9 +16,10 @@ import java.net.UnknownHostException;
* A HTTP2 based implementation of {@link Request}
*/
class Http2Request extends Http2Operation implements Request {
private static final AsciiString POST = new AsciiString("POST");
private static final AsciiString HOST_NAME;
private static final AsciiString HTTPS = new AsciiString("https");
// TODO(user): Inject this
private static final String HOST_NAME;
static {
String hostName;
try {
@ -24,7 +27,7 @@ class Http2Request extends Http2Operation implements Request {
} catch (UnknownHostException uhe) {
hostName = "localhost";
}
HOST_NAME = hostName;
HOST_NAME = new AsciiString(hostName);
}
private final Response response;
@ -33,19 +36,18 @@ class Http2Request extends Http2Operation implements Request {
Metadata.Headers headers,
Http2Codec.Http2Writer writer, Framer framer) {
super(response.getId(), writer, framer);
DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
// TODO(user) Switch the ASCII requirement to false once Netty supports binary
// headers.
String[] headerValues = headers.serializeAscii();
Http2Headers http2Headers = new DefaultHttp2Headers();
byte[][] headerValues = headers.serialize();
for (int i = 0; i < headerValues.length; i++) {
headersBuilder.add(headerValues[i], headerValues[++i]);
http2Headers.add(new AsciiString(headerValues[i], false),
new AsciiString(headerValues[++i], false));
}
headersBuilder.method("POST")
.path("/" + operationName)
http2Headers.method(POST)
.path(new AsciiString("/" + operationName))
.authority(HOST_NAME)
.scheme("https")
.add("content-type", Http2Session.PROTORPC);
writer.writeHeaders(response.getId(), headersBuilder.build(), false);
.scheme(HTTPS)
.add(Http2Session.CONTENT_TYPE, Http2Session.PROTORPC);
writer.writeHeaders(response.getId(), http2Headers, false);
this.response = response;
}

View File

@ -3,14 +3,16 @@ package com.google.net.stubby.http2.netty;
import com.google.net.stubby.Response;
import com.google.net.stubby.transport.Framer;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
/**
* A HTTP2 based implementation of a {@link Response}.
*/
class Http2Response extends Http2Operation implements Response {
private static final AsciiString STATUS_OK = new AsciiString("200");
public static ResponseBuilder builder(final int id, final Http2Codec.Http2Writer writer,
final Framer framer) {
@ -29,8 +31,8 @@ class Http2Response extends Http2Operation implements Response {
private Http2Response(int id, Http2Codec.Http2Writer writer, Framer framer) {
super(id, writer, framer);
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200")
.add("content-type", Http2Session.PROTORPC).build();
Http2Headers headers = new DefaultHttp2Headers().status(STATUS_OK)
.add(Http2Session.CONTENT_TYPE, Http2Session.PROTORPC);
writer.writeHeaders(id, headers, false);
}
}

View File

@ -7,6 +7,8 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.transport.MessageFramer;
import io.netty.handler.codec.AsciiString;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -15,7 +17,8 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class Http2Session implements Session {
public static final String PROTORPC = "application/protorpc";
public static final AsciiString CONTENT_TYPE = new AsciiString("content-type");
public static final AsciiString PROTORPC = new AsciiString("application/protorpc");
private final Http2Codec.Http2Writer writer;
private final RequestRegistry requestRegistry;

View File

@ -1,7 +1,6 @@
package com.google.net.stubby.newtransport;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
/**
* A observer of a server-side transport for stream creation events.

View File

@ -9,13 +9,14 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientStream;
import com.google.net.stubby.newtransport.GrpcDeframer;
import com.google.net.stubby.newtransport.MessageDeframer2;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.MessageDeframer2;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.handler.codec.http2.Http2Headers;
@ -148,8 +149,8 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
return true;
}
String contentType = headers.get(HttpUtil.CONTENT_TYPE_HEADER);
return HttpUtil.CONTENT_TYPE_PROTORPC.equalsIgnoreCase(contentType);
AsciiString contentType = headers.get(Utils.CONTENT_TYPE_HEADER);
return Utils.CONTENT_TYPE_PROTORPC.equalsIgnoreCase(contentType);
}
/**
@ -160,7 +161,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
return Transport.Code.UNKNOWN;
}
String statusLine = headers.status();
AsciiString statusLine = headers.status();
if (statusLine == null) {
return Transport.Code.UNKNOWN;
}

View File

@ -19,6 +19,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
@ -50,7 +51,7 @@ class NettyClientTransport extends AbstractClientTransport {
private final Http2Negotiator.Negotiation negotiation;
private final NettyClientHandler handler;
private final boolean ssl;
private final String authority;
private final AsciiString authority;
private Channel channel;
NettyClientTransport(InetSocketAddress address, NegotiationType negotiationType) {
@ -63,7 +64,7 @@ class NettyClientTransport extends AbstractClientTransport {
this.address = Preconditions.checkNotNull(address, "address");
this.eventGroup = Preconditions.checkNotNull(eventGroup, "eventGroup");
authority = address.getHostString() + ":" + address.getPort();
authority = new AsciiString(address.getHostString() + ":" + address.getPort());
handler = newHandler();
switch (negotiationType) {
@ -94,7 +95,7 @@ class NettyClientTransport extends AbstractClientTransport {
try {
// Convert the headers into Netty HTTP/2 headers.
String defaultPath = "/" + method.getName();
AsciiString defaultPath = new AsciiString("/" + method.getName());
Http2Headers http2Headers = Utils.convertHeaders(headers, ssl, defaultPath, authority);
// Write the request and await creation of the stream.

View File

@ -1,11 +1,11 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.netty.Utils.HTTP_METHOD;
import static com.google.net.stubby.newtransport.netty.Utils.STATUS_OK;
import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ServerStreamListener;
import com.google.net.stubby.newtransport.ServerTransportListener;
@ -33,7 +33,6 @@ import io.netty.handler.codec.http2.Http2StreamException;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -86,7 +85,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
http2Stream.data(stream);
String method = determineMethod(streamId, headers);
ServerStreamListener listener = transportListener.streamCreated(stream, method,
new Metadata.Headers(headers));
Utils.convertHeaders(headers));
stream.setListener(listener);
} catch (Http2Exception e) {
throw e;
@ -181,13 +180,14 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
} else if (msg instanceof SendResponseHeadersCommand) {
SendResponseHeadersCommand cmd = (SendResponseHeadersCommand) msg;
writeHeaders(
ctx, cmd.streamId(),
DefaultHttp2Headers.newBuilder()
.status("200")
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC)
.build(),
0, false, promise);
writeHeaders(ctx,
cmd.streamId(),
new DefaultHttp2Headers()
.status(STATUS_OK)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC),
0,
false,
promise);
} else {
AssertionError e = new AssertionError("Write called for unexpected type: "
+ msg.getClass().getName());
@ -208,7 +208,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
String.format("Header '%s'='%s', while '%s' is expected", CONTENT_TYPE_HEADER,
headers.get(CONTENT_TYPE_HEADER), CONTENT_TYPE_PROTORPC));
}
String methodName = TransportFrameUtil.getFullMethodNameFromPath(headers.path());
String methodName = TransportFrameUtil.getFullMethodNameFromPath(headers.path().toString());
if (methodName == null) {
throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM,
String.format("Malformatted path: %s", headers.path()));

View File

@ -1,29 +1,32 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.newtransport.HttpUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.inject.Provider;
/**
* Common utility methods.
*/
class Utils {
public static final AsciiString STATUS_OK = new AsciiString("200");
public static final AsciiString HTTP_METHOD = new AsciiString(HttpUtil.HTTP_METHOD);
public static final AsciiString HTTPS = new AsciiString("https");
public static final AsciiString HTTP = new AsciiString("http");
public static final AsciiString CONTENT_TYPE_HEADER =
new AsciiString(HttpUtil.CONTENT_TYPE_HEADER);
public static final AsciiString CONTENT_TYPE_PROTORPC =
new AsciiString(HttpUtil.CONTENT_TYPE_PROTORPC);
/**
* Copies the content of the given {@link ByteBuffer} to a new {@link ByteBuf} instance.
*/
@ -33,59 +36,52 @@ class Utils {
return buf;
}
public static Metadata.Headers convertHeaders(Http2Headers http2Headers) {
// The Netty AsciiString class is really just a wrapper around a byte[] and supports
// arbitrary binary data, not just ASCII.
byte[][] headerValues = new byte[http2Headers.size()*2][];
int i = 0;
for (Map.Entry<AsciiString, AsciiString> entry : http2Headers) {
headerValues[i++] = entry.getKey().array();
headerValues[i++] = entry.getValue().array();
}
return new Metadata.Headers(headerValues);
}
public static Http2Headers convertHeaders(Metadata.Headers headers,
boolean ssl,
String defaultPath,
String defaultAuthority) {
AsciiString defaultPath,
AsciiString defaultAuthority) {
Preconditions.checkNotNull(headers, "headers");
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(defaultAuthority, "defaultAuthority");
DefaultHttp2Headers.Builder headersBuilder = DefaultHttp2Headers.newBuilder();
Http2Headers http2Headers = new DefaultHttp2Headers();
// Add any application-provided headers first.
byte[][] serializedHeaders = headers.serialize();
for (int i = 0; i < serializedHeaders.length; i++) {
String key = new String(serializedHeaders[i], UTF_8);
String value = new String(serializedHeaders[++i], UTF_8);
headersBuilder.add(key, value);
http2Headers.add(new AsciiString(serializedHeaders[i], false),
new AsciiString(serializedHeaders[++i], false));
}
// Now set GRPC-specific default headers.
headersBuilder
http2Headers
.authority(defaultAuthority)
.path(defaultPath)
.method(HTTP_METHOD)
.scheme(ssl? "https" : "http")
.scheme(ssl? HTTPS : HTTP)
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
// Override the default authority and path if provided by the headers.
if (headers.getAuthority() != null) {
headersBuilder.authority(headers.getAuthority());
http2Headers.authority(new AsciiString(headers.getAuthority()));
}
if (headers.getPath() != null) {
headersBuilder.path(headers.getPath());
http2Headers.path(new AsciiString(headers.getPath()));
}
return headersBuilder.build();
}
public static ImmutableMap<String, Provider<String>> convertHeaders(Http2Headers headers) {
ImmutableMap.Builder<String, Provider<String>> grpcHeaders =
new ImmutableMap.Builder<String, Provider<String>>();
for (Map.Entry<String, String> header : headers) {
if (!header.getKey().startsWith(":")) {
final String value = header.getValue();
// headers starting with ":" are reserved for HTTP/2 built-in headers
grpcHeaders.put(header.getKey(), new Provider<String>() {
@Override
public String get() {
return value;
}
});
}
}
return grpcHeaders.build();
return http2Headers;
}
private Utils() {

View File

@ -1,8 +1,10 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.netty.Utils.HTTPS;
import static com.google.net.stubby.newtransport.netty.Utils.HTTP_METHOD;
import static com.google.net.stubby.newtransport.netty.Utils.STATUS_OK;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
@ -16,13 +18,13 @@ import static org.mockito.Mockito.when;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
@ -76,15 +78,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
mockContext();
mockFuture(true);
grpcHeaders = DefaultHttp2Headers
.newBuilder()
.scheme("https")
.authority("www.fake.com")
.path("/fakemethod")
grpcHeaders = new DefaultHttp2Headers()
.scheme(HTTPS)
.authority(as("www.fake.com"))
.path(as("/fakemethod"))
.method(HTTP_METHOD)
.add("auth", "sometoken")
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC)
.build();
.add(as("auth"), as("sometoken"))
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
when(stream.state()).thenReturn(StreamState.OPEN);
@ -121,12 +121,12 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
eq(0),
eq(false));
Http2Headers headers = captor.getValue();
assertEquals("https", headers.scheme());
assertEquals("https", headers.scheme().toString());
assertEquals(HTTP_METHOD, headers.method());
assertEquals("www.fake.com", headers.authority());
assertEquals("www.fake.com", headers.authority().toString());
assertEquals(CONTENT_TYPE_PROTORPC, headers.get(CONTENT_TYPE_HEADER));
assertEquals("/fakemethod", headers.path());
assertEquals("sometoken", headers.get("auth"));
assertEquals("/fakemethod", headers.path().toString());
assertEquals("sometoken", headers.get(as("auth")).toString());
}
@Test
@ -170,8 +170,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
createStream();
// Read a headers frame first.
Http2Headers headers = DefaultHttp2Headers.newBuilder().status("200")
.set(HttpUtil.CONTENT_TYPE_HEADER, HttpUtil.CONTENT_TYPE_PROTORPC).build();
Http2Headers headers = new DefaultHttp2Headers().status(STATUS_OK)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
ByteBuf headersFrame = headersFrame(3, headers);
handler.channelRead(this.ctx, headersFrame);
verify(stream).inboundHeadersRecieved(headers, false);
@ -269,4 +269,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
inboundFlow,
outboundFlow);
}
private AsciiString as(String string) {
return new AsciiString(string);
}
}

View File

@ -1,5 +1,8 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.netty.Utils.STATUS_OK;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
@ -9,7 +12,6 @@ import static org.mockito.Mockito.verify;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
@ -131,7 +133,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
}
private Http2Headers grpcResponseHeaders() {
return DefaultHttp2Headers.newBuilder().status("200")
.set(HttpUtil.CONTENT_TYPE_HEADER, HttpUtil.CONTENT_TYPE_PROTORPC).build();
return new DefaultHttp2Headers()
.status(STATUS_OK)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
}
}

View File

@ -1,5 +1,8 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.netty.Utils.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.netty.Utils.HTTP_METHOD;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -14,10 +17,8 @@ import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.Framer;
import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.MessageFramer;
import com.google.net.stubby.newtransport.ServerStream;
import com.google.net.stubby.newtransport.ServerStreamListener;
@ -27,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
@ -162,11 +164,10 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
}
private void createStream() throws Exception {
Http2Headers headers = DefaultHttp2Headers.newBuilder()
.method(HttpUtil.HTTP_METHOD)
.set(HttpUtil.CONTENT_TYPE_HEADER, HttpUtil.CONTENT_TYPE_PROTORPC)
.path("/foo.bar")
.build();
Http2Headers headers = new DefaultHttp2Headers()
.method(HTTP_METHOD)
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC)
.path(new AsciiString("/foo.bar"));
ByteBuf headersFrame = headersFrame(STREAM_ID, headers);
handler.channelRead(ctx, headersFrame);
ArgumentCaptor<NettyServerStream> streamCaptor =

View File

@ -2,7 +2,6 @@ package com.google.net.stubby.newtransport.netty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.notNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
@ -26,7 +25,7 @@ import org.mockito.Mock;
public class NettyServerStreamTest extends NettyStreamTestBase {
@Mock
protected ServerStreamListener listener;
protected ServerStreamListener serverListener;
private Metadata.Trailers trailers = new Metadata.Trailers();
@Test
@ -46,7 +45,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
} catch (IllegalStateException expected) {
}
assertEquals(StreamState.OPEN, stream.state());
verifyZeroInteractions(listener);
verifyZeroInteractions(serverListener);
}
@Test
@ -55,12 +54,12 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
stream().close(Status.CANCELLED, trailers);
verify(channel).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.CANCELLED), true));
verifyZeroInteractions(listener);
verifyZeroInteractions(serverListener);
// Sending complete. Listener gets closed()
stream().complete();
verify(listener).closed(Status.CANCELLED, trailers);
verify(serverListener).closed(Status.CANCELLED, trailers);
assertEquals(StreamState.CLOSED, stream.state());
verifyZeroInteractions(listener);
verifyZeroInteractions(serverListener);
}
@Test
@ -68,17 +67,17 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(listener).halfClosed();
verify(serverListener).halfClosed();
// Server closes. Status sent
stream().close(Status.OK, trailers);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(serverListener);
assertEquals(StreamState.CLOSED, stream.state());
verify(channel).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
// Sending and receiving complete. Listener gets closed()
stream().complete();
verify(listener).closed(Status.OK, trailers);
verifyNoMoreInteractions(listener);
verify(serverListener).closed(Status.OK, trailers);
verifyNoMoreInteractions(serverListener);
}
@Test
@ -86,7 +85,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(listener).halfClosed();
verify(serverListener).halfClosed();
// Client half-closes again.
try {
stream().remoteEndClosed();
@ -94,7 +93,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
} catch (IllegalStateException expected) {
}
assertEquals(StreamState.WRITE_ONLY, stream.state());
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(serverListener);
}
@Test
@ -102,9 +101,9 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state());
verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(serverListener);
}
@Test
@ -112,10 +111,10 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
stream().abortStream(status, false);
assertEquals(StreamState.CLOSED, stream.state());
verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
verify(channel, never()).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(serverListener);
}
@Test
@ -124,26 +123,26 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(listener).halfClosed();
verify(serverListener).halfClosed();
// Abort
stream().abortStream(status, true);
verify(listener).closed(same(status), notNull(Metadata.Trailers.class));
verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(serverListener);
}
@Override
protected NettyStream createStream() {
NettyServerStream stream = new NettyServerStream(channel, STREAM_ID, inboundFlow);
stream.setListener(listener);
stream.setListener(serverListener);
assertEquals(StreamState.OPEN, stream.state());
verifyZeroInteractions(listener);
verifyZeroInteractions(serverListener);
return stream;
}
@Override
protected ServerStreamListener listener() {
return listener;
return serverListener;
}
private NettyServerStream stream() {