netty: use custom Http2Headers class for encoding Metadata

Before:
Benchmark                              (headerCount)    Mode     Cnt     Score    Error  Units
HeadersBenchmark.convertClientHeaders             10  sample  127008   631.214 ±  3.543  ns/op
HeadersBenchmark.convertClientHeaders             20  sample  142036  1125.874 ± 21.114  ns/op
HeadersBenchmark.convertClientHeaders             50  sample  117570  2678.635 ± 47.764  ns/op
HeadersBenchmark.convertClientHeaders            100  sample  115919  5427.720 ± 67.956  ns/op
HeadersBenchmark.convertServerHeaders             10  sample  133903   610.970 ±  3.094  ns/op
HeadersBenchmark.convertServerHeaders             20  sample  138155  1154.304 ±  4.595  ns/op
HeadersBenchmark.convertServerHeaders             50  sample  120078  2658.175 ± 38.679  ns/op
HeadersBenchmark.convertServerHeaders            100  sample  120509  5212.341 ± 49.062  ns/op

After:
Benchmark                              (headerCount)    Mode     Cnt     Score    Error  Units
HeadersBenchmark.convertClientHeaders             10  sample  102473   407.383 ±  2.693  ns/op
HeadersBenchmark.convertClientHeaders             20  sample  103205   791.241 ± 38.054  ns/op
HeadersBenchmark.convertClientHeaders             50  sample  173817  1840.311 ±  5.718  ns/op
HeadersBenchmark.convertClientHeaders            100  sample  169984  3690.753 ± 44.308  ns/op
HeadersBenchmark.convertServerHeaders             10  sample  103615   401.661 ±  2.922  ns/op
HeadersBenchmark.convertServerHeaders             20  sample   99060   823.453 ±  5.553  ns/op
HeadersBenchmark.convertServerHeaders             50  sample  171824  1846.788 ± 29.840  ns/op
HeadersBenchmark.convertServerHeaders            100  sample  171622  3670.354 ±  7.127  ns/op
This commit is contained in:
Carl Mastrangelo 2016-05-23 17:45:31 -07:00
parent 149ce204e2
commit fa22259e3a
5 changed files with 821 additions and 69 deletions

View File

@ -605,7 +605,7 @@ public abstract class AbstractInteropTest {
Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY)); Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
} }
@Test(timeout = 100000000) @Test(timeout = 10000)
public void sendsTimeoutHeader() { public void sendsTimeoutHeader() {
long configuredTimeoutMinutes = 100; long configuredTimeoutMinutes = 100;
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)

View File

@ -0,0 +1,559 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.netty;
import io.netty.handler.codec.Headers;
import io.netty.handler.codec.http2.Http2Headers;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
abstract class AbstractHttp2Headers implements Http2Headers {
@Override
public int size() {
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty() {
throw new UnsupportedOperationException();
}
@Override
public Set<CharSequence> names() {
throw new UnsupportedOperationException();
}
@Override
public CharSequence get(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence get(CharSequence name, CharSequence defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence getAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence getAndRemove(CharSequence name, CharSequence defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public List<CharSequence> getAll(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public List<CharSequence> getAllAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public Boolean getBoolean(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public boolean getBoolean(CharSequence name, boolean defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Byte getByte(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public byte getByte(CharSequence name, byte defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Character getChar(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public char getChar(CharSequence name, char defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Short getShort(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public short getShort(CharSequence name, short defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Integer getInt(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public int getInt(CharSequence name, int defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Long getLong(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public long getLong(CharSequence name, long defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Float getFloat(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public float getFloat(CharSequence name, float defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Double getDouble(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public double getDouble(CharSequence name, double defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Long getTimeMillis(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public long getTimeMillis(CharSequence name, long defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Boolean getBooleanAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public boolean getBooleanAndRemove(CharSequence name, boolean defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Byte getByteAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public byte getByteAndRemove(CharSequence name, byte defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Character getCharAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public char getCharAndRemove(CharSequence name, char defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Short getShortAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public short getShortAndRemove(CharSequence name, short defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Integer getIntAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public int getIntAndRemove(CharSequence name, int defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Long getLongAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public long getLongAndRemove(CharSequence name, long defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Float getFloatAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public float getFloatAndRemove(CharSequence name, float defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Double getDoubleAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public double getDoubleAndRemove(CharSequence name, double defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public Long getTimeMillisAndRemove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public long getTimeMillisAndRemove(CharSequence name, long defaultValue) {
throw new UnsupportedOperationException();
}
@Override
public boolean contains(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public boolean contains(CharSequence name, CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsObject(CharSequence name, Object value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsBoolean(CharSequence name, boolean value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsByte(CharSequence name, byte value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsChar(CharSequence name, char value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsShort(CharSequence name, short value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsInt(CharSequence name, int value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsLong(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsFloat(CharSequence name, float value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsDouble(CharSequence name, double value) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsTimeMillis(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers add(CharSequence name, CharSequence... values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers add(Headers<? extends CharSequence, ? extends CharSequence, ?> headers) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers add(CharSequence name, CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers add(CharSequence name, Iterable<? extends CharSequence> values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addObject(CharSequence name, Object value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addObject(CharSequence name, Iterable<?> values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addObject(CharSequence name, Object... values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addBoolean(CharSequence name, boolean value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addByte(CharSequence name, byte value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addChar(CharSequence name, char value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addShort(CharSequence name, short value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addInt(CharSequence name, int value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addLong(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addFloat(CharSequence name, float value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addDouble(CharSequence name, double value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers addTimeMillis(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers set(Headers<? extends CharSequence, ? extends CharSequence, ?> headers) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers set(CharSequence name, CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers set(CharSequence name, Iterable<? extends CharSequence> values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers set(CharSequence name, CharSequence... values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setObject(CharSequence name, Object value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setObject(CharSequence name, Iterable<?> values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setObject(CharSequence name, Object... values) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setBoolean(CharSequence name, boolean value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setByte(CharSequence name, byte value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setChar(CharSequence name, char value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setShort(CharSequence name, short value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setInt(CharSequence name, int value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setLong(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setFloat(CharSequence name, float value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setDouble(CharSequence name, double value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setTimeMillis(CharSequence name, long value) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers setAll(Headers<? extends CharSequence, ? extends CharSequence, ?> headers) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(CharSequence name) {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers clear() {
throw new UnsupportedOperationException();
}
@Override
public Iterator<Entry<CharSequence, CharSequence>> iterator() {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers method(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence method() {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers scheme(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence scheme() {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers authority(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence authority() {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers path(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence path() {
throw new UnsupportedOperationException();
}
@Override
public Http2Headers status(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public CharSequence status() {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,147 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.netty;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
/**
* A custom implementation of Http2Headers that only includes methods used by gRPC.
*/
final class GrpcHttp2Headers extends AbstractHttp2Headers {
private final AsciiString[] normalHeaders;
private final AsciiString[] preHeaders;
private static final AsciiString[] EMPTY = new AsciiString[]{};
static GrpcHttp2Headers clientRequestHeaders(byte[][] serializedMetadata, AsciiString authority,
AsciiString path, AsciiString method, AsciiString scheme, AsciiString userAgent) {
AsciiString[] preHeaders = new AsciiString[] {
Http2Headers.PseudoHeaderName.AUTHORITY.value(), authority,
Http2Headers.PseudoHeaderName.PATH.value(), path,
Http2Headers.PseudoHeaderName.METHOD.value(), method,
Http2Headers.PseudoHeaderName.SCHEME.value(), scheme,
Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC,
Utils.TE_HEADER, Utils.TE_TRAILERS,
Utils.USER_AGENT, userAgent,
};
return new GrpcHttp2Headers(preHeaders, serializedMetadata);
}
static GrpcHttp2Headers serverResponseHeaders(byte[][] serializedMetadata) {
AsciiString[] preHeaders = new AsciiString[] {
Http2Headers.PseudoHeaderName.STATUS.value(), Utils.STATUS_OK,
Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC,
};
return new GrpcHttp2Headers(preHeaders, serializedMetadata);
}
static GrpcHttp2Headers serverResponseTrailers(byte[][] serializedMetadata) {
return new GrpcHttp2Headers(EMPTY, serializedMetadata);
}
private GrpcHttp2Headers(AsciiString[] preHeaders, byte[][] serializedMetadata) {
normalHeaders = new AsciiString[serializedMetadata.length];
for (int i = 0; i < normalHeaders.length; i++) {
normalHeaders[i] = new AsciiString(serializedMetadata[i], false);
}
this.preHeaders = preHeaders;
}
@Override
public Iterator<Entry<CharSequence, CharSequence>> iterator() {
return new Itr();
}
@Override
public int size() {
return (normalHeaders.length + preHeaders.length) / 2;
}
private class Itr implements Entry<CharSequence, CharSequence>,
Iterator<Entry<CharSequence, CharSequence>> {
private int idx;
private AsciiString[] current = preHeaders.length != 0 ? preHeaders : normalHeaders;
private AsciiString key;
private AsciiString value;
@Override
public boolean hasNext() {
return idx < current.length;
}
/**
* This function is ordered specifically to get ideal performance on OpenJDK. If you decide to
* change it, even in ways that don't seem possible to affect performance, please benchmark
* speeds before and after.
*/
@Override
public Entry<CharSequence, CharSequence> next() {
if (hasNext()) {
key = current[idx];
value = current[idx + 1];
idx += 2;
if (idx >= current.length && current == preHeaders) {
current = normalHeaders;
idx = 0;
}
return this;
} else {
throw new NoSuchElementException();
}
}
@Override
public CharSequence getKey() {
return key;
}
@Override
public CharSequence getValue() {
return value;
}
@Override
public CharSequence setValue(CharSequence value) {
throw new UnsupportedOperationException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}

View File

@ -45,7 +45,6 @@ import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.TransportFrameUtil; import io.grpc.internal.TransportFrameUtil;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
@ -121,29 +120,18 @@ class Utils {
AsciiString userAgent) { AsciiString userAgent) {
Preconditions.checkNotNull(defaultPath, "defaultPath"); Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(authority, "authority"); Preconditions.checkNotNull(authority, "authority");
// Add any application-provided headers first.
Http2Headers http2Headers = convertMetadata(headers);
// Now set GRPC-specific default headers. return GrpcHttp2Headers.clientRequestHeaders(
http2Headers.authority(authority) TransportFrameUtil.toHttp2Headers(headers),
.path(defaultPath) authority,
.method(HTTP_METHOD) defaultPath,
.scheme(scheme) HTTP_METHOD,
.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC) scheme,
.set(TE_HEADER, TE_TRAILERS); userAgent);
// Set the User-Agent header.
//String userAgent = GrpcUtil.getGrpcUserAgent("netty", headers.get(USER_AGENT_KEY));
http2Headers.set(USER_AGENT, userAgent);
return http2Headers;
} }
public static Http2Headers convertServerHeaders(Metadata headers) { public static Http2Headers convertServerHeaders(Metadata headers) {
Http2Headers http2Headers = convertMetadata(headers); return GrpcHttp2Headers.serverResponseHeaders(TransportFrameUtil.toHttp2Headers(headers));
http2Headers.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
http2Headers.status(STATUS_OK);
return http2Headers;
} }
public static Metadata convertTrailers(Http2Headers http2Headers) { public static Metadata convertTrailers(Http2Headers http2Headers) {
@ -151,25 +139,10 @@ class Utils {
} }
public static Http2Headers convertTrailers(Metadata trailers, boolean headersSent) { public static Http2Headers convertTrailers(Metadata trailers, boolean headersSent) {
Http2Headers http2Trailers = convertMetadata(trailers);
if (!headersSent) { if (!headersSent) {
http2Trailers.set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC); return convertServerHeaders(trailers);
http2Trailers.status(STATUS_OK);
} }
return http2Trailers; return GrpcHttp2Headers.serverResponseTrailers(TransportFrameUtil.toHttp2Headers(trailers));
}
private static Http2Headers convertMetadata(Metadata headers) {
Preconditions.checkNotNull(headers, "headers");
Http2Headers http2Headers = new DefaultHttp2Headers(validateHeaders, headers.headerCount());
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
AsciiString name = new AsciiString(serializedHeaders[i], false);
AsciiString value = new AsciiString(serializedHeaders[i + 1], false);
http2Headers.add(name, value);
}
return http2Headers;
} }
public static Status statusFromThrowable(Throwable t) { public static Status statusFromThrowable(Throwable t) {

View File

@ -31,11 +31,13 @@
package io.grpc.netty; package io.grpc.netty;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.netty.NettyTestUtil.messageFrame; import static io.grpc.netty.NettyTestUtil.messageFrame;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -46,6 +48,9 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerStreamListener;
@ -60,6 +65,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -75,6 +82,9 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
@Mock @Mock
private NettyServerHandler handler; private NettyServerHandler handler;
@Captor
private ArgumentCaptor<Http2Headers> headersCaptor;
private Metadata trailers = new Metadata(); private Metadata trailers = new Metadata();
@Before @Before
@ -89,56 +99,98 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
@Test @Test
public void writeMessageShouldSendResponse() throws Exception { public void writeMessageShouldSendResponse() throws Exception {
ListMultimap<CharSequence, CharSequence> expectedHeaders =
ImmutableListMultimap.copyOf(new DefaultHttp2Headers()
.status(Utils.STATUS_OK)
.set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC));
stream.writeHeaders(new Metadata()); stream.writeHeaders(new Metadata());
Http2Headers headers = new DefaultHttp2Headers()
.status(Utils.STATUS_OK) ArgumentCaptor<SendResponseHeadersCommand> sendHeadersCap =
.set(Utils.CONTENT_TYPE_HEADER, Utils.CONTENT_TYPE_GRPC); ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
verify(writeQueue).enqueue( verify(writeQueue).enqueue(sendHeadersCap.capture(), eq(true));
new SendResponseHeadersCommand(stream.transportState(), headers, false), true); SendResponseHeadersCommand sendHeaders = sendHeadersCap.getValue();
assertThat(sendHeaders.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(sendHeaders.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(sendHeaders.endOfStream()).isFalse();
byte[] msg = smallMessage(); byte[] msg = smallMessage();
stream.writeMessage(new ByteArrayInputStream(msg)); stream.writeMessage(new ByteArrayInputStream(msg));
stream.flush(); stream.flush();
verify(writeQueue).enqueue( verify(writeQueue).enqueue(
eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false)), eq(new SendGrpcFrameCommand(stream.transportState(), messageFrame(MESSAGE), false)),
any(ChannelPromise.class), isA(ChannelPromise.class),
eq(true)); eq(true));
} }
@Test @Test
public void writeHeadersShouldSendHeaders() throws Exception { public void writeHeadersShouldSendHeaders() throws Exception {
Metadata headers = new Metadata(); Metadata headers = new Metadata();
ListMultimap<CharSequence, CharSequence> expectedHeaders =
ImmutableListMultimap.copyOf(Utils.convertServerHeaders(headers));
stream().writeHeaders(headers); stream().writeHeaders(headers);
verify(writeQueue).enqueue(new SendResponseHeadersCommand(stream.transportState(),
Utils.convertServerHeaders(headers), false), true); ArgumentCaptor<SendResponseHeadersCommand> sendHeadersCap =
ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
verify(writeQueue).enqueue(sendHeadersCap.capture(), eq(true));
SendResponseHeadersCommand sendHeaders = sendHeadersCap.getValue();
assertThat(sendHeaders.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(sendHeaders.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(sendHeaders.endOfStream()).isFalse();
} }
@Test @Test
public void closeBeforeClientHalfCloseShouldSucceed() throws Exception { public void closeBeforeClientHalfCloseShouldSucceed() throws Exception {
stream().close(Status.OK, new Metadata()); ListMultimap<CharSequence, CharSequence> expectedHeaders =
verify(writeQueue).enqueue( ImmutableListMultimap.copyOf(new DefaultHttp2Headers()
new SendResponseHeadersCommand(stream.transportState(), new DefaultHttp2Headers()
.status(new AsciiString("200")) .status(new AsciiString("200"))
.set(new AsciiString("content-type"), new AsciiString("application/grpc")) .set(new AsciiString("content-type"), new AsciiString("application/grpc"))
.set(new AsciiString("grpc-status"), new AsciiString("0")), true), .set(new AsciiString("grpc-status"), new AsciiString("0")));
true);
stream().close(Status.OK, new Metadata());
ArgumentCaptor<SendResponseHeadersCommand> sendHeadersCap =
ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
verify(writeQueue).enqueue(sendHeadersCap.capture(), eq(true));
SendResponseHeadersCommand sendHeaders = sendHeadersCap.getValue();
assertThat(sendHeaders.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(sendHeaders.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(sendHeaders.endOfStream()).isTrue();
verifyZeroInteractions(serverListener); verifyZeroInteractions(serverListener);
// Sending complete. Listener gets closed() // Sending complete. Listener gets closed()
stream().transportState().complete(); stream().transportState().complete();
verify(serverListener).closed(Status.OK); verify(serverListener).closed(Status.OK);
verifyZeroInteractions(serverListener); verifyZeroInteractions(serverListener);
} }
@Test @Test
public void closeWithErrorBeforeClientHalfCloseShouldSucceed() throws Exception { public void closeWithErrorBeforeClientHalfCloseShouldSucceed() throws Exception {
// Error is sent on wire and ends the stream ListMultimap<CharSequence, CharSequence> expectedHeaders =
stream().close(Status.CANCELLED, trailers); ImmutableListMultimap.copyOf(new DefaultHttp2Headers()
verify(writeQueue).enqueue(
new SendResponseHeadersCommand(stream.transportState(), new DefaultHttp2Headers()
.status(new AsciiString("200")) .status(new AsciiString("200"))
.set(new AsciiString("content-type"), new AsciiString("application/grpc")) .set(new AsciiString("content-type"), new AsciiString("application/grpc"))
.set(new AsciiString("grpc-status"), new AsciiString("1")), true), .set(new AsciiString("grpc-status"), new AsciiString("1")));
true);
// Error is sent on wire and ends the stream
stream().close(Status.CANCELLED, trailers);
ArgumentCaptor<SendResponseHeadersCommand> sendHeadersCap =
ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
verify(writeQueue).enqueue(sendHeadersCap.capture(), eq(true));
SendResponseHeadersCommand sendHeaders = sendHeadersCap.getValue();
assertThat(sendHeaders.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(sendHeaders.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(sendHeaders.endOfStream()).isTrue();
verifyZeroInteractions(serverListener); verifyZeroInteractions(serverListener);
// Sending complete. Listener gets closed() // Sending complete. Listener gets closed()
stream().transportState().complete(); stream().transportState().complete();
verify(serverListener).closed(Status.OK); verify(serverListener).closed(Status.OK);
@ -147,19 +199,31 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
@Test @Test
public void closeAfterClientHalfCloseShouldSucceed() throws Exception { public void closeAfterClientHalfCloseShouldSucceed() throws Exception {
ListMultimap<CharSequence, CharSequence> expectedHeaders =
ImmutableListMultimap.copyOf(new DefaultHttp2Headers()
.status(new AsciiString("200"))
.set(new AsciiString("content-type"), new AsciiString("application/grpc"))
.set(new AsciiString("grpc-status"), new AsciiString("0")));
// Client half-closes. Listener gets halfClosed() // Client half-closes. Listener gets halfClosed()
stream().transportState() stream().transportState()
.inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); .inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
verify(serverListener).halfClosed(); verify(serverListener).halfClosed();
// Server closes. Status sent // Server closes. Status sent
stream().close(Status.OK, trailers); stream().close(Status.OK, trailers);
verifyNoMoreInteractions(serverListener); verifyNoMoreInteractions(serverListener);
verify(writeQueue).enqueue(
new SendResponseHeadersCommand(stream.transportState(), new DefaultHttp2Headers() ArgumentCaptor<SendResponseHeadersCommand> cmdCap =
.status(new AsciiString("200")) ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
.set(new AsciiString("content-type"), new AsciiString("application/grpc")) verify(writeQueue).enqueue(cmdCap.capture(), eq(true));
.set(new AsciiString("grpc-status"), new AsciiString("0")), true), SendResponseHeadersCommand cmd = cmdCap.getValue();
true); assertThat(cmd.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(cmd.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(cmd.endOfStream()).isTrue();
// Sending and receiving complete. Listener gets closed() // Sending and receiving complete. Listener gets closed()
stream().transportState().complete(); stream().transportState().complete();
verify(serverListener).closed(Status.OK); verify(serverListener).closed(Status.OK);
@ -190,14 +254,23 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
} }
@Test @Test
public void emptyFramerShouldSendNoPayload() throws Exception { public void emptyFramerShouldSendNoPayload() {
stream().close(Status.OK, new Metadata()); ListMultimap<CharSequence, CharSequence> expectedHeaders =
verify(writeQueue).enqueue( ImmutableListMultimap.copyOf(new DefaultHttp2Headers()
new SendResponseHeadersCommand(stream.transportState(), new DefaultHttp2Headers()
.status(new AsciiString("200")) .status(new AsciiString("200"))
.set(new AsciiString("content-type"), new AsciiString("application/grpc")) .set(new AsciiString("content-type"), new AsciiString("application/grpc"))
.set(new AsciiString("grpc-status"), new AsciiString("0")), true), .set(new AsciiString("grpc-status"), new AsciiString("0")));
true); ArgumentCaptor<SendResponseHeadersCommand> cmdCap =
ArgumentCaptor.forClass(SendResponseHeadersCommand.class);
stream().close(Status.OK, new Metadata());
verify(writeQueue).enqueue(cmdCap.capture(), eq(true));
SendResponseHeadersCommand cmd = cmdCap.getValue();
assertThat(cmd.stream()).isSameAs(stream.transportState());
assertThat(ImmutableListMultimap.copyOf(cmd.headers()))
.containsExactlyEntriesIn(expectedHeaders);
assertThat(cmd.endOfStream()).isTrue();
} }
@Test @Test