From 4deff027baf7d08bd2e5240992d06ea9f233c058 Mon Sep 17 00:00:00 2001 From: nmittler Date: Mon, 9 Mar 2015 12:52:53 -0700 Subject: [PATCH] Proper buffer closure when receiving DATA with EOS The Http2ClientStream should not close the buffer in this case since it's already been given to the deframer and potentially to the user. Added cleanup code to MessageDeframer and AbstractClientStream to make sure that we free the Buffer when appropriate. --- .../grpc/transport/AbstractClientStream.java | 40 ++++++++++++------- .../io/grpc/transport/Http2ClientStream.java | 7 ++-- .../io/grpc/transport/MessageDeframer.java | 21 +++++++--- .../netty/NettyClientStreamTest.java | 19 +++++++++ 4 files changed, 63 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index 8a59890a7c..55fb78662e 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -88,8 +88,12 @@ public abstract class AbstractClientStream extends AbstractStream * responsible for properly closing streams when protocol errors occur. * * @param errorStatus the error to report + * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that + * may already be queued up in the deframer. If {@code false}, the listener will be + * notified immediately after all currently completed messages in the deframer have been + * delivered to the application. */ - protected void inboundTransportError(Status errorStatus) { + protected void inboundTransportError(Status errorStatus, boolean stopDelivery) { if (inboundPhase() == Phase.STATUS) { log.log(Level.INFO, "Received transport error on closed stream {0} {1}", new Object[]{id(), errorStatus}); @@ -97,7 +101,7 @@ public abstract class AbstractClientStream extends AbstractStream } // For transport errors we immediately report status to the application layer // and do not wait for additional payloads. - transportReportStatus(errorStatus, false, new Metadata.Trailers()); + transportReportStatus(errorStatus, stopDelivery, new Metadata.Trailers()); } /** @@ -121,22 +125,30 @@ public abstract class AbstractClientStream extends AbstractStream * Processes the contents of a received data frame from the server. * * @param frame the received data frame. Its ownership is transferred to this method. + * @param */ protected void inboundDataReceived(Buffer frame) { Preconditions.checkNotNull(frame, "frame"); - if (inboundPhase() == Phase.STATUS) { - frame.close(); - return; - } - if (inboundPhase() == Phase.HEADERS) { - // Have not received headers yet so error - inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload")); - frame.close(); - return; - } - inboundPhase(Phase.MESSAGE); + boolean needToCloseFrame = true; + try { + if (inboundPhase() == Phase.STATUS) { + return; + } + if (inboundPhase() == Phase.HEADERS) { + // Have not received headers yet so error + inboundTransportError(Status.INTERNAL + .withDescription("headers not received before payload"), false); + return; + } + inboundPhase(Phase.MESSAGE); - deframe(frame, false); + needToCloseFrame = false; + deframe(frame, false); + } finally { + if (needToCloseFrame) { + frame.close(); + } + } } @Override diff --git a/core/src/main/java/io/grpc/transport/Http2ClientStream.java b/core/src/main/java/io/grpc/transport/Http2ClientStream.java index 4938cc4482..c9f09a59aa 100644 --- a/core/src/main/java/io/grpc/transport/Http2ClientStream.java +++ b/core/src/main/java/io/grpc/transport/Http2ClientStream.java @@ -125,7 +125,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { Buffers.readAsString(frame, errorCharset)); frame.close(); if (transportError.getDescription().length() > 1000 || endOfStream) { - inboundTransportError(transportError); + inboundTransportError(transportError, false); if (!endOfStream) { // We have enough error detail so lets cancel. sendCancel(); @@ -136,8 +136,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { if (endOfStream) { // This is a protocol violation as we expect to receive trailers. transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame"); - frame.close(); - inboundTransportError(transportError); + inboundTransportError(transportError, true); } } } @@ -156,7 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { transportError = checkContentType(trailers); } if (transportError != null) { - inboundTransportError(transportError); + inboundTransportError(transportError, false); } else { Status status = statusFromTrailers(trailers); stripTransportDetails(trailers); diff --git a/core/src/main/java/io/grpc/transport/MessageDeframer.java b/core/src/main/java/io/grpc/transport/MessageDeframer.java index cfab174759..183e9690a9 100644 --- a/core/src/main/java/io/grpc/transport/MessageDeframer.java +++ b/core/src/main/java/io/grpc/transport/MessageDeframer.java @@ -153,14 +153,23 @@ public class MessageDeframer implements Closeable { * {@code endOfStream=true}. */ public void deframe(Buffer data, boolean endOfStream) { - checkNotClosed(); Preconditions.checkNotNull(data, "data"); - Preconditions.checkState(!this.endOfStream, "Past end of stream"); - unprocessed.addBuffer(data); + boolean needToCloseData = true; + try { + checkNotClosed(); + Preconditions.checkState(!this.endOfStream, "Past end of stream"); - // Indicate that all of the data for this stream has been received. - this.endOfStream = endOfStream; - deliver(); + needToCloseData = false; + unprocessed.addBuffer(data); + + // Indicate that all of the data for this stream has been received. + this.endOfStream = endOfStream; + deliver(); + } finally { + if (needToCloseData) { + data.close(); + } + } } /** diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java index ba2026feb7..e01104f2c7 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java @@ -253,6 +253,25 @@ public class NettyClientStreamTest extends NettyStreamTestBase { verify(listener).closed(eq(Status.CANCELLED), eq(trailers)); } + @Test + public void dataFrameWithEosShouldDeframeAndThenFail() { + stream().id(1); + stream().request(1); + + // Receive headers first so that it's a valid GRPC response. + stream().transportHeadersReceived(grpcResponseHeaders(), false); + + // Receive a DATA frame with EOS set. + stream().transportDataReceived(simpleGrpcFrame(), true); + + // Verify that the message was delivered. + verify(listener).messageRead(any(InputStream.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); + assertEquals(Status.Code.INTERNAL, captor.getValue().getCode()); + } + @Override protected AbstractStream createStream() { AbstractStream stream = new NettyClientStream(listener, channel, handler);