From 1d97b50315c9aaf1a0f3d34957edb1770353a4e4 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Thu, 24 Jan 2019 08:50:09 -0800 Subject: [PATCH] core: do not lose status when RST_STREAM with NO_ERROR received (#5264) --- .../grpc/internal/AbstractClientStream.java | 35 +++++++++---------- .../internal/AbstractClientStreamTest.java | 18 ++++++++++ .../okhttp/OkHttpClientTransportTest.java | 21 +++++++++++ 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 4a7ce85a7e..2d44decf13 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -240,8 +240,8 @@ public abstract class AbstractClientStream extends AbstractStream * #listenerClosed} because there may still be messages buffered to deliver to the application. */ private boolean statusReported; - private Metadata trailers; - private Status trailerStatus; + /** True if the status reported (set via {@link #transportReportStatus}) is OK. */ + private boolean statusReportedIsOk; protected TransportState( int maxMessageSize, @@ -269,18 +269,14 @@ public abstract class AbstractClientStream extends AbstractStream @Override public void deframerClosed(boolean hasPartialMessage) { + checkState(statusReported, "status should have been reported on deframer closed"); deframerClosed = true; - - if (trailerStatus != null) { - if (trailerStatus.isOk() && hasPartialMessage) { - trailerStatus = Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"); - trailers = new Metadata(); - } - transportReportStatus(trailerStatus, false, trailers); - } else { - checkState(statusReported, "status should have been reported on deframer closed"); + if (statusReportedIsOk && hasPartialMessage) { + transportReportStatus( + Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"), + true, + new Metadata()); } - if (deframerClosedTask != null) { deframerClosedTask.run(); deframerClosedTask = null; @@ -387,10 +383,8 @@ public abstract class AbstractClientStream extends AbstractStream new Object[]{status, trailers}); return; } - this.trailers = trailers; statsTraceCtx.clientInboundTrailers(trailers); - trailerStatus = status; - closeDeframer(false); + transportReportStatus(status, false, trailers); } /** @@ -419,14 +413,16 @@ public abstract class AbstractClientStream extends AbstractStream * {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)} * will receive * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that - * may already be queued up in the deframer. If {@code false}, the listener will be - * notified immediately after all currently completed messages in the deframer have been - * delivered to the application. + * may already be queued up in the deframer and overrides any previously queued status. + * If {@code false}, the listener will be notified immediately after all currently + * completed messages in the deframer have been delivered to the application. * @param trailers new instance of {@code Trailers}, either empty or those returned by the * server */ public final void transportReportStatus( - final Status status, final RpcProgress rpcProgress, boolean stopDelivery, + final Status status, + final RpcProgress rpcProgress, + boolean stopDelivery, final Metadata trailers) { checkNotNull(status, "status"); checkNotNull(trailers, "trailers"); @@ -435,6 +431,7 @@ public abstract class AbstractClientStream extends AbstractStream return; } statusReported = true; + statusReportedIsOk = status.isOk(); onStreamDeallocated(); if (deframerClosed) { diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index d370007201..e400757030 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -338,6 +338,24 @@ public class AbstractClientStreamTest { assertEquals("rst___stream", statusCaptor.getValue().getDescription()); } + @Test + public void statusOkFollowedByRstStreamNoError() { + AbstractClientStream stream = + new BaseAbstractClientStream(allocator, statsTraceCtx, transportTracer); + stream.start(mockListener); + stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 1, 1})); + stream.transportState().inboundTrailersReceived(new Metadata(), Status.OK); + Status status = Status.INTERNAL.withDescription("rst___stream"); + // Simulate getting a reset + stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata()); + stream.transportState().requestMessagesFromDeframer(1); + + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockListener) + .closed(statusCaptor.capture(), any(RpcProgress.class), any(Metadata.class)); + assertTrue(statusCaptor.getValue().isOk()); + } + @Test public void trailerOkWithTruncatedMessage() { AbstractClientStream stream = diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 29c5710743..e8c6398619 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -483,6 +483,27 @@ public class OkHttpClientTransportTest { shutdownAndVerify(); } + + @Test + public void receiveResetNoError() throws Exception { + initTransport(); + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + assertContainStream(3); + frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); + Buffer buffer = createMessageFrame("a message"); + frameHandler().data(false, 3, buffer, (int) buffer.size()); + frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS); + frameHandler().rstStream(3, ErrorCode.NO_ERROR); + stream.request(1); + listener.waitUntilStreamClosed(); + + assertTrue(listener.status.isOk()); + shutdownAndVerify(); + } + @Test public void cancelStream() throws Exception { initTransport();