Merge pull request #177 from nmittler/irce

Proper buffer closure when receiving DATA with EOS
This commit is contained in:
Nathan Mittler 2015-03-09 13:29:31 -07:00
commit 2c07e23911
4 changed files with 63 additions and 24 deletions

View File

@ -88,8 +88,12 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* responsible for properly closing streams when protocol errors occur.
*
* @param errorStatus the error to report
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
* may already be queued up in the deframer. If {@code false}, the listener will be
* notified immediately after all currently completed messages in the deframer have been
* delivered to the application.
*/
protected void inboundTransportError(Status errorStatus) {
protected void inboundTransportError(Status errorStatus, boolean stopDelivery) {
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus});
@ -97,7 +101,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
}
// For transport errors we immediately report status to the application layer
// and do not wait for additional payloads.
transportReportStatus(errorStatus, false, new Metadata.Trailers());
transportReportStatus(errorStatus, stopDelivery, new Metadata.Trailers());
}
/**
@ -121,22 +125,30 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* Processes the contents of a received data frame from the server.
*
* @param frame the received data frame. Its ownership is transferred to this method.
* @param
*/
protected void inboundDataReceived(Buffer frame) {
Preconditions.checkNotNull(frame, "frame");
if (inboundPhase() == Phase.STATUS) {
frame.close();
return;
}
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload"));
frame.close();
return;
}
inboundPhase(Phase.MESSAGE);
boolean needToCloseFrame = true;
try {
if (inboundPhase() == Phase.STATUS) {
return;
}
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL
.withDescription("headers not received before payload"), false);
return;
}
inboundPhase(Phase.MESSAGE);
deframe(frame, false);
needToCloseFrame = false;
deframe(frame, false);
} finally {
if (needToCloseFrame) {
frame.close();
}
}
}
@Override

View File

@ -125,7 +125,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
Buffers.readAsString(frame, errorCharset));
frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError);
inboundTransportError(transportError, false);
if (!endOfStream) {
// We have enough error detail so lets cancel.
sendCancel();
@ -136,8 +136,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
if (endOfStream) {
// This is a protocol violation as we expect to receive trailers.
transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame");
frame.close();
inboundTransportError(transportError);
inboundTransportError(transportError, true);
}
}
}
@ -156,7 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
transportError = checkContentType(trailers);
}
if (transportError != null) {
inboundTransportError(transportError);
inboundTransportError(transportError, false);
} else {
Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers);

View File

@ -153,14 +153,23 @@ public class MessageDeframer implements Closeable {
* {@code endOfStream=true}.
*/
public void deframe(Buffer data, boolean endOfStream) {
checkNotClosed();
Preconditions.checkNotNull(data, "data");
Preconditions.checkState(!this.endOfStream, "Past end of stream");
unprocessed.addBuffer(data);
boolean needToCloseData = true;
try {
checkNotClosed();
Preconditions.checkState(!this.endOfStream, "Past end of stream");
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
deliver();
needToCloseData = false;
unprocessed.addBuffer(data);
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
deliver();
} finally {
if (needToCloseData) {
data.close();
}
}
}
/**

View File

@ -253,6 +253,25 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
verify(listener).closed(eq(Status.CANCELLED), eq(trailers));
}
@Test
public void dataFrameWithEosShouldDeframeAndThenFail() {
stream().id(1);
stream().request(1);
// Receive headers first so that it's a valid GRPC response.
stream().transportHeadersReceived(grpcResponseHeaders(), false);
// Receive a DATA frame with EOS set.
stream().transportDataReceived(simpleGrpcFrame(), true);
// Verify that the message was delivered.
verify(listener).messageRead(any(InputStream.class));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Status.Code.INTERNAL, captor.getValue().getCode());
}
@Override
protected AbstractStream<Integer> createStream() {
AbstractStream<Integer> stream = new NettyClientStream(listener, channel, handler);