okhttp: Clean up stream when error happens.

Resolves #279
This commit is contained in:
Xudong Ma 2015-04-14 16:49:17 +08:00
parent 4b00476d33
commit d0aad72441
4 changed files with 82 additions and 24 deletions

View File

@ -127,11 +127,9 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
frame.close(); frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) { if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError); inboundTransportError(transportError);
if (!endOfStream) {
// We have enough error detail so lets cancel. // We have enough error detail so lets cancel.
sendCancel(); sendCancel();
} }
}
} else { } else {
inboundDataReceived(frame); inboundDataReceived(frame);
if (endOfStream) { if (endOfStream) {
@ -157,6 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
} }
if (transportError != null) { if (transportError != null) {
inboundTransportError(transportError); inboundTransportError(transportError);
sendCancel();
} else { } else {
Status status = statusFromTrailers(trailers); Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers); stripTransportDetails(trailers);

View File

@ -146,9 +146,7 @@ class OkHttpClientStream extends Http2ClientStream {
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
Status status = Status.INTERNAL.withDescription( Status status = Status.INTERNAL.withDescription(
"Received data size exceeded our receiving window size"); "Received data size exceeded our receiving window size");
if (transport.finishStream(id(), status)) { transport.finishStream(id(), status, null);
transport.stopIfNecessary();
}
return; return;
} }
super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
@ -187,18 +185,13 @@ class OkHttpClientStream extends Http2ClientStream {
@Override @Override
protected void sendCancel() { protected void sendCancel() {
if (transport.finishStream(id(), Status.CANCELLED)) { transport.finishStream(id(), Status.CANCELLED, ErrorCode.CANCEL);
frameWriter.rstStream(id(), ErrorCode.CANCEL);
transport.stopIfNecessary();
}
} }
@Override @Override
public void remoteEndClosed() { public void remoteEndClosed() {
super.remoteEndClosed(); super.remoteEndClosed();
if (transport.finishStream(id(), null)) { transport.finishStream(id(), null, null);
transport.stopIfNecessary();
}
} }
void setOutboundFlowState(Object outboundFlowState) { void setOutboundFlowState(Object outboundFlowState) {

View File

@ -263,14 +263,20 @@ public class OkHttpClientTransport implements ClientTransport {
} }
} }
private void startPendingStreams() { /**
* Starts pending streams, returns true if at least one pending stream is started.
*/
private boolean startPendingStreams() {
boolean hasStreamStarted = false;
synchronized (lock) { synchronized (lock) {
while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
PendingStream pendingStream = pendingStreams.poll(); PendingStream pendingStream = pendingStreams.poll();
startStream(pendingStream.clientStream, pendingStream.requestHeaders); startStream(pendingStream.clientStream, pendingStream.requestHeaders);
pendingStream.createdFuture.set(null); pendingStream.createdFuture.set(null);
hasStreamStarted = true;
} }
} }
return hasStreamStarted;
} }
private void failPendingStreams(Status status) { private void failPendingStreams(Status status) {
@ -389,22 +395,33 @@ public class OkHttpClientTransport implements ClientTransport {
} }
/** /**
* Called when a stream is closed. * Called when a stream is closed, we do things like:
* <ul>
* <li>Removing the stream from the map.
* <li>Optionally reporting the status.
* <li>Starting pending streams if we can.
* <li>Stopping the transport if this is the last live stream under a go-away status.
* </ul>
* *
* <p> Return false if the stream has already finished. * @param streamId the Id of the stream.
* @param status the final status of this stream, null means no need to report.
* @Param errorCode reset the stream with this ErrorCode if not null.
*/ */
boolean finishStream(int streamId, @Nullable Status status) { void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode) {
OkHttpClientStream stream; OkHttpClientStream stream;
stream = streams.remove(streamId); stream = streams.remove(streamId);
if (stream != null) { if (stream != null) {
if (errorCode != null) {
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
}
if (status != null) { if (status != null) {
boolean isCancelled = status.getCode() == Code.CANCELLED; boolean isCancelled = status.getCode() == Code.CANCELLED;
stream.transportReportStatus(status, isCancelled, new Metadata.Trailers()); stream.transportReportStatus(status, isCancelled, new Metadata.Trailers());
} }
startPendingStreams(); if (!startPendingStreams()) {
return true; stopIfNecessary();
}
} }
return false;
} }
/** /**
@ -523,9 +540,7 @@ public class OkHttpClientTransport implements ClientTransport {
@Override @Override
public void rstStream(int streamId, ErrorCode errorCode) { public void rstStream(int streamId, ErrorCode errorCode) {
if (finishStream(streamId, toGrpcStatus(errorCode))) { finishStream(streamId, toGrpcStatus(errorCode), null);
stopIfNecessary();
}
} }
@Override @Override

View File

@ -137,6 +137,7 @@ public class OkHttpClientTransportTest {
@After @After
public void tearDown() { public void tearDown() {
clientTransport.shutdown(); clientTransport.shutdown();
assertEquals(0, streams.size());
verify(frameWriter).close(); verify(frameWriter).close();
frameReader.assertClosed(); frameReader.assertClosed();
executor.shutdown(); executor.shutdown();
@ -639,6 +640,56 @@ public class OkHttpClientTransportTest {
stream.cancel(); stream.cancel();
} }
@Test
public void receiveDataWithoutHeader() throws Exception {
MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener).request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler.data(false, 3, buffer, (int) buffer.size());
// Trigger the failure by a trailer.
frameHandler.headers(
true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data"));
assertEquals(0, listener.messages.size());
}
@Test
public void receiveDataWithoutHeaderAndTrailer() throws Exception {
MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener).request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler.data(false, 3, buffer, (int) buffer.size());
// Trigger the failure by a data frame.
buffer = createMessageFrame(new byte[1]);
frameHandler.data(true, 3, buffer, (int) buffer.size());
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data"));
assertEquals(0, listener.messages.size());
}
@Test
public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener).request(1);
Buffer buffer = createMessageFrame(new byte[1000]);
frameHandler.data(false, 3, buffer, (int) buffer.size());
// Once we receive enough detail, we cancel the stream. so we should have sent cancel.
verify(frameWriter).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("no headers received prior to data"));
assertEquals(0, listener.messages.size());
}
private void waitForStreamPending(int expected) throws Exception { private void waitForStreamPending(int expected) throws Exception {
int duration = TIME_OUT_MS / 10; int duration = TIME_OUT_MS / 10;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {