From f3a90cd42bc55dd927417fe255f66d0cb357d69f Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 29 Jan 2015 17:30:44 -0800 Subject: [PATCH] netty: Cancel stream if interrupted during create Previously streams were being partially orphaned if there was an interruption during stream creation. To handle cancellation, AbstractClientStream's cancel() had to be changed remove the "optimization" otherwise, again, the stream would be orphaned. --- .../io/grpc/transport/AbstractClientStream.java | 11 ++++------- .../main/java/io/grpc/transport/ClientStream.java | 3 ++- .../grpc/transport/netty/NettyClientTransport.java | 6 +++--- .../transport/netty/NettyClientHandlerTest.java | 14 ++++++++++++++ .../transport/netty/NettyClientStreamTest.java | 4 ++-- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index 23cdbcd81a..15dd564eda 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -260,17 +260,14 @@ public abstract class AbstractClientStream extends AbstractStream @Override public void cancel() { outboundPhase(Phase.STATUS); - if (id() != null) { - // Only send a cancellation to remote side if we have actually been allocated - // a stream id and we are not already closed. i.e. the server side is aware of the stream. - sendCancel(); - } + sendCancel(); dispose(); } /** - * Send a stream cancellation message to the remote server. Can be called by either the - * application or transport layers. + * Cancel the stream and send a stream cancellation message to the remote server, if necessary. + * Can be called by either the application or transport layers. This method is safe to be called + * at any time and multiple times. */ protected abstract void sendCancel(); diff --git a/core/src/main/java/io/grpc/transport/ClientStream.java b/core/src/main/java/io/grpc/transport/ClientStream.java index c3cddc2411..aa17096d87 100644 --- a/core/src/main/java/io/grpc/transport/ClientStream.java +++ b/core/src/main/java/io/grpc/transport/ClientStream.java @@ -39,7 +39,8 @@ public interface ClientStream extends Stream { /** * Used to abnormally terminate the stream. After calling this method, no further messages will be * sent or received, however it may still be possible to receive buffered messages for a brief - * period until {@link ClientStreamListener#closed} is called. + * period until {@link ClientStreamListener#closed} is called. This method is safe to be called + * at any time and multiple times. */ void cancel(); diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index d1e9141b43..7819768eb5 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -155,11 +155,11 @@ class NettyClientTransport extends AbstractClientTransport { } catch (InterruptedException e) { // Restore the interrupt. Thread.currentThread().interrupt(); - stream.dispose(); + stream.cancel(); throw new RuntimeException(e); } catch (ExecutionException e) { - stream.dispose(); - throw new RuntimeException(e); + stream.cancel(); + throw new RuntimeException(e.getCause() != null ? e.getCause() : e); } return stream; diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java index e15c857da7..8f9c06faf6 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.calls; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.grpc.Metadata; @@ -175,6 +176,19 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { verify(promise).setFailure(any(Throwable.class)); } + @Test + public void cancelBeforeStreamAssignedShouldSucceed() throws Exception { + handler.connection().local().maxStreams(0); + handler.write(ctx, new CreateStreamCommand(grpcHeaders, stream), promise); + mockContext(); + verify(stream, never()).id(any(Integer.class)); + when(stream.id()).thenReturn(null); + + handler.write(ctx, new CancelStreamCommand(stream), promise); + verify(promise).setSuccess(); + verifyNoMoreInteractions(ctx); + } + @Test public void sendFrameShouldSucceed() throws Exception { createStream(); diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java index 1efdf8ce24..9fc5ac9ca5 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java @@ -97,9 +97,9 @@ public class NettyClientStreamTest extends NettyStreamTestBase { } @Test - public void cancelShouldNotSendCommandIfStreamNotCreated() { + public void cancelShouldStillSendCommandIfStreamNotCreatedToCancelCreation() { stream().cancel(); - verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class)); + verify(channel).writeAndFlush(any(CancelStreamCommand.class)); } @Test