diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index 8a59890a7c..55fb78662e 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -88,8 +88,12 @@ public abstract class AbstractClientStream extends AbstractStream * responsible for properly closing streams when protocol errors occur. * * @param errorStatus the error to report + * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that + * may already be queued up in the deframer. If {@code false}, the listener will be + * notified immediately after all currently completed messages in the deframer have been + * delivered to the application. */ - protected void inboundTransportError(Status errorStatus) { + protected void inboundTransportError(Status errorStatus, boolean stopDelivery) { if (inboundPhase() == Phase.STATUS) { log.log(Level.INFO, "Received transport error on closed stream {0} {1}", new Object[]{id(), errorStatus}); @@ -97,7 +101,7 @@ public abstract class AbstractClientStream extends AbstractStream } // For transport errors we immediately report status to the application layer // and do not wait for additional payloads. - transportReportStatus(errorStatus, false, new Metadata.Trailers()); + transportReportStatus(errorStatus, stopDelivery, new Metadata.Trailers()); } /** @@ -121,22 +125,30 @@ public abstract class AbstractClientStream extends AbstractStream * Processes the contents of a received data frame from the server. * * @param frame the received data frame. Its ownership is transferred to this method. + * @param */ protected void inboundDataReceived(Buffer frame) { Preconditions.checkNotNull(frame, "frame"); - if (inboundPhase() == Phase.STATUS) { - frame.close(); - return; - } - if (inboundPhase() == Phase.HEADERS) { - // Have not received headers yet so error - inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload")); - frame.close(); - return; - } - inboundPhase(Phase.MESSAGE); + boolean needToCloseFrame = true; + try { + if (inboundPhase() == Phase.STATUS) { + return; + } + if (inboundPhase() == Phase.HEADERS) { + // Have not received headers yet so error + inboundTransportError(Status.INTERNAL + .withDescription("headers not received before payload"), false); + return; + } + inboundPhase(Phase.MESSAGE); - deframe(frame, false); + needToCloseFrame = false; + deframe(frame, false); + } finally { + if (needToCloseFrame) { + frame.close(); + } + } } @Override diff --git a/core/src/main/java/io/grpc/transport/Http2ClientStream.java b/core/src/main/java/io/grpc/transport/Http2ClientStream.java index 4938cc4482..c9f09a59aa 100644 --- a/core/src/main/java/io/grpc/transport/Http2ClientStream.java +++ b/core/src/main/java/io/grpc/transport/Http2ClientStream.java @@ -125,7 +125,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { Buffers.readAsString(frame, errorCharset)); frame.close(); if (transportError.getDescription().length() > 1000 || endOfStream) { - inboundTransportError(transportError); + inboundTransportError(transportError, false); if (!endOfStream) { // We have enough error detail so lets cancel. sendCancel(); @@ -136,8 +136,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { if (endOfStream) { // This is a protocol violation as we expect to receive trailers. transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame"); - frame.close(); - inboundTransportError(transportError); + inboundTransportError(transportError, true); } } } @@ -156,7 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { transportError = checkContentType(trailers); } if (transportError != null) { - inboundTransportError(transportError); + inboundTransportError(transportError, false); } else { Status status = statusFromTrailers(trailers); stripTransportDetails(trailers); diff --git a/core/src/main/java/io/grpc/transport/MessageDeframer.java b/core/src/main/java/io/grpc/transport/MessageDeframer.java index cfab174759..183e9690a9 100644 --- a/core/src/main/java/io/grpc/transport/MessageDeframer.java +++ b/core/src/main/java/io/grpc/transport/MessageDeframer.java @@ -153,14 +153,23 @@ public class MessageDeframer implements Closeable { * {@code endOfStream=true}. */ public void deframe(Buffer data, boolean endOfStream) { - checkNotClosed(); Preconditions.checkNotNull(data, "data"); - Preconditions.checkState(!this.endOfStream, "Past end of stream"); - unprocessed.addBuffer(data); + boolean needToCloseData = true; + try { + checkNotClosed(); + Preconditions.checkState(!this.endOfStream, "Past end of stream"); - // Indicate that all of the data for this stream has been received. - this.endOfStream = endOfStream; - deliver(); + needToCloseData = false; + unprocessed.addBuffer(data); + + // Indicate that all of the data for this stream has been received. + this.endOfStream = endOfStream; + deliver(); + } finally { + if (needToCloseData) { + data.close(); + } + } } /** diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java index ba2026feb7..e01104f2c7 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java @@ -253,6 +253,25 @@ public class NettyClientStreamTest extends NettyStreamTestBase { verify(listener).closed(eq(Status.CANCELLED), eq(trailers)); } + @Test + public void dataFrameWithEosShouldDeframeAndThenFail() { + stream().id(1); + stream().request(1); + + // Receive headers first so that it's a valid GRPC response. + stream().transportHeadersReceived(grpcResponseHeaders(), false); + + // Receive a DATA frame with EOS set. + stream().transportDataReceived(simpleGrpcFrame(), true); + + // Verify that the message was delivered. + verify(listener).messageRead(any(InputStream.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); + assertEquals(Status.Code.INTERNAL, captor.getValue().getCode()); + } + @Override protected AbstractStream createStream() { AbstractStream stream = new NettyClientStream(listener, channel, handler);