Propagate headers/trailers if response isn't gRPC

This provides more structured data into the application for it to do
special handling.

In general, we would hope most people don't need this functionality, but
it is a good escape hatch to allow users to workaround infrastructure
problems.
This commit is contained in:
Eric Anderson 2015-12-15 11:53:10 -08:00
parent 5205d9cc43
commit bcb5fcdf82
5 changed files with 52 additions and 12 deletions

View File

@ -97,8 +97,10 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* responsible for properly closing streams when protocol errors occur.
*
* @param errorStatus the error to report
* @param metadata any metadata received
*/
protected void inboundTransportError(Status errorStatus) {
protected void inboundTransportError(Status errorStatus, Metadata metadata) {
Preconditions.checkNotNull(metadata, "metadata");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus});
@ -106,7 +108,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
}
// For transport errors we immediately report status to the application layer
// and do not wait for additional payloads.
transportReportStatus(errorStatus, false, new Metadata());
transportReportStatus(errorStatus, false, metadata);
}
/**
@ -130,7 +132,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
.withCause(e);
// TODO(carl-mastrangelo): look back into tearing down this stream. sendCancel() can be
// buffered.
inboundTransportError(status);
inboundTransportError(status, headers);
sendCancel(status);
return;
}
@ -155,7 +157,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL
.withDescription("headers not received before payload"));
.withDescription("headers not received before payload"), new Metadata());
return;
}
inboundPhase(Phase.MESSAGE);

View File

@ -65,7 +65,9 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
private static final Metadata.Key<Integer> HTTP2_STATUS = Metadata.Key.of(":status",
HTTP_STATUS_LINE_MARSHALLER);
/** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */
private Status transportError;
private Metadata transportErrorMetadata;
private Charset errorCharset = Charsets.UTF_8;
private boolean contentTypeChecked;
@ -99,6 +101,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
// Note we don't immediately report the transport error, instead we wait for more data on the
// stream so we can accumulate more detail into the error before reporting it.
transportError = transportError.augmentDescription("\n" + headers.toString());
transportErrorMetadata = headers;
errorCharset = extractCharset(headers);
} else {
stripTransportDetails(headers);
@ -117,6 +120,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
// Must receive headers prior to receiving any payload as we use headers to check for
// protocol correctness.
transportError = Status.INTERNAL.withDescription("no headers received prior to data");
transportErrorMetadata = new Metadata();
}
if (transportError != null) {
// We've already detected a transport error and now we're just accumulating more detail
@ -125,7 +129,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
+ ReadableBuffers.readAsString(frame, errorCharset));
frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError);
inboundTransportError(transportError, transportErrorMetadata);
// We have enough error detail so lets cancel.
sendCancel(Status.CANCELLED);
}
@ -134,7 +138,8 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
if (endOfStream) {
// This is a protocol violation as we expect to receive trailers.
transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame");
inboundTransportError(transportError);
transportErrorMetadata = new Metadata();
inboundTransportError(transportError, transportErrorMetadata);
}
}
}
@ -151,9 +156,10 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
transportError = transportError.augmentDescription(trailers.toString());
} else {
transportError = checkContentType(trailers);
transportErrorMetadata = trailers;
}
if (transportError != null) {
inboundTransportError(transportError);
inboundTransportError(transportError, transportErrorMetadata);
sendCancel(Status.CANCELLED);
} else {
Status status = statusFromTrailers(trailers);

View File

@ -245,11 +245,15 @@ public class AbstractClientStreamTest {
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, "bad");
Metadata.Key<String> randomKey = Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER);
headers.put(randomKey, "4");
stream.inboundHeadersReceived(headers);
verify(mockListener).closed(statusCaptor.capture(), isA(Metadata.class));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(mockListener).closed(statusCaptor.capture(), metadataCaptor.capture());
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
assertEquals("4", metadataCaptor.getValue().get(randomKey));
}
@Test

View File

@ -236,6 +236,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
public void invalidInboundHeadersCancelStream() throws Exception {
stream().id(STREAM_ID);
Http2Headers headers = grpcResponseHeaders();
headers.set("random", "4");
headers.remove(CONTENT_TYPE_HEADER);
// Remove once b/16290036 is fixed.
headers.status(new AsciiString("500"));
@ -252,8 +253,11 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
// Now verify that cancel is sent and an error is reported to the listener
verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.class));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(listener).closed(captor.capture(), metadataCaptor.capture());
assertEquals(Status.UNKNOWN.getCode(), captor.getValue().getCode());
assertEquals("4", metadataCaptor.getValue()
.get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
assertTrue(stream.isClosed());
}
@ -269,10 +273,13 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
.set(new AsciiString("grpc-status", UTF_8), new AsciiString("0", UTF_8));
stream().transportHeadersReceived(trailers, true);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.class));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(listener).closed(captor.capture(), metadataCaptor.capture());
Status status = captor.getValue();
assertEquals(Status.Code.INTERNAL, status.getCode());
assertTrue(status.getDescription().contains("content-type"));
assertEquals("application/bad", metadataCaptor.getValue()
.get(Metadata.Key.of("Content-Type", Metadata.ASCII_STRING_MARSHALLER)));
}
@Test

View File

@ -325,8 +325,8 @@ public class OkHttpClientTransportTest {
stream.start(listener);
stream.request(1);
assertContainStream(3);
// Empty headers block without correct content type or status
frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(),
// Headers block without correct content type or status
frameHandler().headers(false, false, 3, 0, Arrays.asList(new Header("random", "4")),
HeadersMode.HTTP_20_HEADERS);
// Now wait to receive 1000 bytes of data so we can have a better error message before
// cancelling the streaam.
@ -335,6 +335,27 @@ public class OkHttpClientTransportTest {
assertNull(listener.headers);
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertNotNull(listener.trailers);
assertEquals("4", listener.trailers
.get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
shutdownAndVerify();
}
@Test
public void invalidInboundTrailersPropagateToMetadata() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata());
stream.start(listener);
stream.request(1);
assertContainStream(3);
// Headers block with EOS without correct content type or status
frameHandler().headers(true, true, 3, 0, Arrays.asList(new Header("random", "4")),
HeadersMode.HTTP_20_HEADERS);
assertNull(listener.headers);
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertNotNull(listener.trailers);
assertEquals("4", listener.trailers
.get(Metadata.Key.of("random", Metadata.ASCII_STRING_MARSHALLER)));
shutdownAndVerify();
}