Proper buffer closure when receiving DATA with EOS

The Http2ClientStream should not close the buffer in this case since
it's already been given to the deframer and potentially to the user.

Added cleanup code to MessageDeframer and AbstractClientStream to make
sure that we free the Buffer when appropriate.
This commit is contained in:
nmittler 2015-03-09 12:52:53 -07:00
parent 456216b364
commit 4deff027ba
4 changed files with 63 additions and 24 deletions

View File

@ -88,8 +88,12 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* responsible for properly closing streams when protocol errors occur. * responsible for properly closing streams when protocol errors occur.
* *
* @param errorStatus the error to report * @param errorStatus the error to report
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
* may already be queued up in the deframer. If {@code false}, the listener will be
* notified immediately after all currently completed messages in the deframer have been
* delivered to the application.
*/ */
protected void inboundTransportError(Status errorStatus) { protected void inboundTransportError(Status errorStatus, boolean stopDelivery) {
if (inboundPhase() == Phase.STATUS) { if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}", log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus}); new Object[]{id(), errorStatus});
@ -97,7 +101,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
} }
// For transport errors we immediately report status to the application layer // For transport errors we immediately report status to the application layer
// and do not wait for additional payloads. // and do not wait for additional payloads.
transportReportStatus(errorStatus, false, new Metadata.Trailers()); transportReportStatus(errorStatus, stopDelivery, new Metadata.Trailers());
} }
/** /**
@ -121,22 +125,30 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* Processes the contents of a received data frame from the server. * Processes the contents of a received data frame from the server.
* *
* @param frame the received data frame. Its ownership is transferred to this method. * @param frame the received data frame. Its ownership is transferred to this method.
* @param
*/ */
protected void inboundDataReceived(Buffer frame) { protected void inboundDataReceived(Buffer frame) {
Preconditions.checkNotNull(frame, "frame"); Preconditions.checkNotNull(frame, "frame");
boolean needToCloseFrame = true;
try {
if (inboundPhase() == Phase.STATUS) { if (inboundPhase() == Phase.STATUS) {
frame.close();
return; return;
} }
if (inboundPhase() == Phase.HEADERS) { if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error // Have not received headers yet so error
inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload")); inboundTransportError(Status.INTERNAL
frame.close(); .withDescription("headers not received before payload"), false);
return; return;
} }
inboundPhase(Phase.MESSAGE); inboundPhase(Phase.MESSAGE);
needToCloseFrame = false;
deframe(frame, false); deframe(frame, false);
} finally {
if (needToCloseFrame) {
frame.close();
}
}
} }
@Override @Override

View File

@ -125,7 +125,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
Buffers.readAsString(frame, errorCharset)); Buffers.readAsString(frame, errorCharset));
frame.close(); frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) { if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError); inboundTransportError(transportError, false);
if (!endOfStream) { if (!endOfStream) {
// We have enough error detail so lets cancel. // We have enough error detail so lets cancel.
sendCancel(); sendCancel();
@ -136,8 +136,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
if (endOfStream) { if (endOfStream) {
// This is a protocol violation as we expect to receive trailers. // This is a protocol violation as we expect to receive trailers.
transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame"); transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame");
frame.close(); inboundTransportError(transportError, true);
inboundTransportError(transportError);
} }
} }
} }
@ -156,7 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
transportError = checkContentType(trailers); transportError = checkContentType(trailers);
} }
if (transportError != null) { if (transportError != null) {
inboundTransportError(transportError); inboundTransportError(transportError, false);
} else { } else {
Status status = statusFromTrailers(trailers); Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers); stripTransportDetails(trailers);

View File

@ -153,14 +153,23 @@ public class MessageDeframer implements Closeable {
* {@code endOfStream=true}. * {@code endOfStream=true}.
*/ */
public void deframe(Buffer data, boolean endOfStream) { public void deframe(Buffer data, boolean endOfStream) {
checkNotClosed();
Preconditions.checkNotNull(data, "data"); Preconditions.checkNotNull(data, "data");
boolean needToCloseData = true;
try {
checkNotClosed();
Preconditions.checkState(!this.endOfStream, "Past end of stream"); Preconditions.checkState(!this.endOfStream, "Past end of stream");
needToCloseData = false;
unprocessed.addBuffer(data); unprocessed.addBuffer(data);
// Indicate that all of the data for this stream has been received. // Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream; this.endOfStream = endOfStream;
deliver(); deliver();
} finally {
if (needToCloseData) {
data.close();
}
}
} }
/** /**

View File

@ -253,6 +253,25 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
verify(listener).closed(eq(Status.CANCELLED), eq(trailers)); verify(listener).closed(eq(Status.CANCELLED), eq(trailers));
} }
@Test
public void dataFrameWithEosShouldDeframeAndThenFail() {
stream().id(1);
stream().request(1);
// Receive headers first so that it's a valid GRPC response.
stream().transportHeadersReceived(grpcResponseHeaders(), false);
// Receive a DATA frame with EOS set.
stream().transportDataReceived(simpleGrpcFrame(), true);
// Verify that the message was delivered.
verify(listener).messageRead(any(InputStream.class));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Status.Code.INTERNAL, captor.getValue().getCode());
}
@Override @Override
protected AbstractStream<Integer> createStream() { protected AbstractStream<Integer> createStream() {
AbstractStream<Integer> stream = new NettyClientStream(listener, channel, handler); AbstractStream<Integer> stream = new NettyClientStream(listener, channel, handler);