From d0aad72441f2f110910003599bb923db6f671f02 Mon Sep 17 00:00:00 2001 From: Xudong Ma Date: Tue, 14 Apr 2015 16:49:17 +0800 Subject: [PATCH] okhttp: Clean up stream when error happens. Resolves #279 --- .../io/grpc/transport/Http2ClientStream.java | 7 ++- .../transport/okhttp/OkHttpClientStream.java | 13 ++--- .../okhttp/OkHttpClientTransport.java | 35 +++++++++---- .../okhttp/OkHttpClientTransportTest.java | 51 +++++++++++++++++++ 4 files changed, 82 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/io/grpc/transport/Http2ClientStream.java b/core/src/main/java/io/grpc/transport/Http2ClientStream.java index 4623aed0c2..badc7c0f20 100644 --- a/core/src/main/java/io/grpc/transport/Http2ClientStream.java +++ b/core/src/main/java/io/grpc/transport/Http2ClientStream.java @@ -127,10 +127,8 @@ public abstract class Http2ClientStream extends AbstractClientStream { frame.close(); if (transportError.getDescription().length() > 1000 || endOfStream) { inboundTransportError(transportError); - if (!endOfStream) { - // We have enough error detail so lets cancel. - sendCancel(); - } + // We have enough error detail so lets cancel. + sendCancel(); } } else { inboundDataReceived(frame); @@ -157,6 +155,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { } if (transportError != null) { inboundTransportError(transportError); + sendCancel(); } else { Status status = statusFromTrailers(trailers); stripTransportDetails(trailers); diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java index 3d24f6e620..24b681dbf4 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java @@ -146,9 +146,7 @@ class OkHttpClientStream extends Http2ClientStream { frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); Status status = Status.INTERNAL.withDescription( "Received data size exceeded our receiving window size"); - if (transport.finishStream(id(), status)) { - transport.stopIfNecessary(); - } + transport.finishStream(id(), status, null); return; } super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); @@ -187,18 +185,13 @@ class OkHttpClientStream extends Http2ClientStream { @Override protected void sendCancel() { - if (transport.finishStream(id(), Status.CANCELLED)) { - frameWriter.rstStream(id(), ErrorCode.CANCEL); - transport.stopIfNecessary(); - } + transport.finishStream(id(), Status.CANCELLED, ErrorCode.CANCEL); } @Override public void remoteEndClosed() { super.remoteEndClosed(); - if (transport.finishStream(id(), null)) { - transport.stopIfNecessary(); - } + transport.finishStream(id(), null, null); } void setOutboundFlowState(Object outboundFlowState) { diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java index 569bde29d9..0e37a3884e 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java @@ -263,14 +263,20 @@ public class OkHttpClientTransport implements ClientTransport { } } - private void startPendingStreams() { + /** + * Starts pending streams, returns true if at least one pending stream is started. + */ + private boolean startPendingStreams() { + boolean hasStreamStarted = false; synchronized (lock) { while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) { PendingStream pendingStream = pendingStreams.poll(); startStream(pendingStream.clientStream, pendingStream.requestHeaders); pendingStream.createdFuture.set(null); + hasStreamStarted = true; } } + return hasStreamStarted; } private void failPendingStreams(Status status) { @@ -389,22 +395,33 @@ public class OkHttpClientTransport implements ClientTransport { } /** - * Called when a stream is closed. + * Called when a stream is closed, we do things like: + *
    + *
  • Removing the stream from the map. + *
  • Optionally reporting the status. + *
  • Starting pending streams if we can. + *
  • Stopping the transport if this is the last live stream under a go-away status. + *
* - *

Return false if the stream has already finished. + * @param streamId the Id of the stream. + * @param status the final status of this stream, null means no need to report. + * @Param errorCode reset the stream with this ErrorCode if not null. */ - boolean finishStream(int streamId, @Nullable Status status) { + void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode errorCode) { OkHttpClientStream stream; stream = streams.remove(streamId); if (stream != null) { + if (errorCode != null) { + frameWriter.rstStream(streamId, ErrorCode.CANCEL); + } if (status != null) { boolean isCancelled = status.getCode() == Code.CANCELLED; stream.transportReportStatus(status, isCancelled, new Metadata.Trailers()); } - startPendingStreams(); - return true; + if (!startPendingStreams()) { + stopIfNecessary(); + } } - return false; } /** @@ -523,9 +540,7 @@ public class OkHttpClientTransport implements ClientTransport { @Override public void rstStream(int streamId, ErrorCode errorCode) { - if (finishStream(streamId, toGrpcStatus(errorCode))) { - stopIfNecessary(); - } + finishStream(streamId, toGrpcStatus(errorCode), null); } @Override diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java index e02bd49060..b4351d541d 100644 --- a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java @@ -137,6 +137,7 @@ public class OkHttpClientTransportTest { @After public void tearDown() { clientTransport.shutdown(); + assertEquals(0, streams.size()); verify(frameWriter).close(); frameReader.assertClosed(); executor.shutdown(); @@ -639,6 +640,56 @@ public class OkHttpClientTransportTest { stream.cancel(); } + @Test + public void receiveDataWithoutHeader() throws Exception { + MockStreamListener listener = new MockStreamListener(); + clientTransport.newStream(method,new Metadata.Headers(), listener).request(1); + Buffer buffer = createMessageFrame(new byte[1]); + frameHandler.data(false, 3, buffer, (int) buffer.size()); + + // Trigger the failure by a trailer. + frameHandler.headers( + true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); + + listener.waitUntilStreamClosed(); + assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); + assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertEquals(0, listener.messages.size()); + } + + @Test + public void receiveDataWithoutHeaderAndTrailer() throws Exception { + MockStreamListener listener = new MockStreamListener(); + clientTransport.newStream(method,new Metadata.Headers(), listener).request(1); + Buffer buffer = createMessageFrame(new byte[1]); + frameHandler.data(false, 3, buffer, (int) buffer.size()); + + // Trigger the failure by a data frame. + buffer = createMessageFrame(new byte[1]); + frameHandler.data(true, 3, buffer, (int) buffer.size()); + + listener.waitUntilStreamClosed(); + assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); + assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertEquals(0, listener.messages.size()); + } + + @Test + public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception { + MockStreamListener listener = new MockStreamListener(); + clientTransport.newStream(method,new Metadata.Headers(), listener).request(1); + Buffer buffer = createMessageFrame(new byte[1000]); + frameHandler.data(false, 3, buffer, (int) buffer.size()); + + // Once we receive enough detail, we cancel the stream. so we should have sent cancel. + verify(frameWriter).rstStream(eq(3), eq(ErrorCode.CANCEL)); + + listener.waitUntilStreamClosed(); + assertEquals(Status.INTERNAL.getCode(), listener.status.getCode()); + assertTrue(listener.status.getDescription().startsWith("no headers received prior to data")); + assertEquals(0, listener.messages.size()); + } + private void waitForStreamPending(int expected) throws Exception { int duration = TIME_OUT_MS / 10; for (int i = 0; i < 10; i++) {