core: do not lose status when RST_STREAM with NO_ERROR received (#5264)

This commit is contained in:
Eric Gribkoff 2019-01-24 08:50:09 -08:00 committed by GitHub
parent 3a38e59bae
commit 1d97b50315
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 19 deletions

View File

@ -240,8 +240,8 @@ public abstract class AbstractClientStream extends AbstractStream
* #listenerClosed} because there may still be messages buffered to deliver to the application. * #listenerClosed} because there may still be messages buffered to deliver to the application.
*/ */
private boolean statusReported; private boolean statusReported;
private Metadata trailers; /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
private Status trailerStatus; private boolean statusReportedIsOk;
protected TransportState( protected TransportState(
int maxMessageSize, int maxMessageSize,
@ -269,18 +269,14 @@ public abstract class AbstractClientStream extends AbstractStream
@Override @Override
public void deframerClosed(boolean hasPartialMessage) { public void deframerClosed(boolean hasPartialMessage) {
deframerClosed = true;
if (trailerStatus != null) {
if (trailerStatus.isOk() && hasPartialMessage) {
trailerStatus = Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame");
trailers = new Metadata();
}
transportReportStatus(trailerStatus, false, trailers);
} else {
checkState(statusReported, "status should have been reported on deframer closed"); checkState(statusReported, "status should have been reported on deframer closed");
deframerClosed = true;
if (statusReportedIsOk && hasPartialMessage) {
transportReportStatus(
Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
true,
new Metadata());
} }
if (deframerClosedTask != null) { if (deframerClosedTask != null) {
deframerClosedTask.run(); deframerClosedTask.run();
deframerClosedTask = null; deframerClosedTask = null;
@ -387,10 +383,8 @@ public abstract class AbstractClientStream extends AbstractStream
new Object[]{status, trailers}); new Object[]{status, trailers});
return; return;
} }
this.trailers = trailers;
statsTraceCtx.clientInboundTrailers(trailers); statsTraceCtx.clientInboundTrailers(trailers);
trailerStatus = status; transportReportStatus(status, false, trailers);
closeDeframer(false);
} }
/** /**
@ -419,14 +413,16 @@ public abstract class AbstractClientStream extends AbstractStream
* {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)} * {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
* will receive * will receive
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
* may already be queued up in the deframer. If {@code false}, the listener will be * may already be queued up in the deframer and overrides any previously queued status.
* notified immediately after all currently completed messages in the deframer have been * If {@code false}, the listener will be notified immediately after all currently
* delivered to the application. * completed messages in the deframer have been delivered to the application.
* @param trailers new instance of {@code Trailers}, either empty or those returned by the * @param trailers new instance of {@code Trailers}, either empty or those returned by the
* server * server
*/ */
public final void transportReportStatus( public final void transportReportStatus(
final Status status, final RpcProgress rpcProgress, boolean stopDelivery, final Status status,
final RpcProgress rpcProgress,
boolean stopDelivery,
final Metadata trailers) { final Metadata trailers) {
checkNotNull(status, "status"); checkNotNull(status, "status");
checkNotNull(trailers, "trailers"); checkNotNull(trailers, "trailers");
@ -435,6 +431,7 @@ public abstract class AbstractClientStream extends AbstractStream
return; return;
} }
statusReported = true; statusReported = true;
statusReportedIsOk = status.isOk();
onStreamDeallocated(); onStreamDeallocated();
if (deframerClosed) { if (deframerClosed) {

View File

@ -338,6 +338,24 @@ public class AbstractClientStreamTest {
assertEquals("rst___stream", statusCaptor.getValue().getDescription()); assertEquals("rst___stream", statusCaptor.getValue().getDescription());
} }
@Test
public void statusOkFollowedByRstStreamNoError() {
AbstractClientStream stream =
new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer);
stream.start(mockListener);
stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 1, 1}));
stream.transportState().inboundTrailersReceived(new Metadata(), Status.OK);
Status status = Status.INTERNAL.withDescription("rst___stream");
// Simulate getting a reset
stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata());
stream.transportState().requestMessagesFromDeframer(1);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockListener)
.closed(statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class));
assertTrue(statusCaptor.getValue().isOk());
}
@Test @Test
public void trailerOkWithTruncatedMessage() { public void trailerOkWithTruncatedMessage() {
AbstractClientStream stream = AbstractClientStream stream =

View File

@ -483,6 +483,27 @@ public class OkHttpClientTransportTest {
shutdownAndVerify(); shutdownAndVerify();
} }
@Test
public void receiveResetNoError() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
assertContainStream(3);
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
Buffer buffer = createMessageFrame("a message");
frameHandler().data(false, 3, buffer, (int) buffer.size());
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
frameHandler().rstStream(3, ErrorCode.NO_ERROR);
stream.request(1);
listener.waitUntilStreamClosed();
assertTrue(listener.status.isOk());
shutdownAndVerify();
}
@Test @Test
public void cancelStream() throws Exception { public void cancelStream() throws Exception {
initTransport(); initTransport();