From a7398dba38f514d111e2480445c9aa1924684233 Mon Sep 17 00:00:00 2001 From: Xudong Ma Date: Fri, 4 Dec 2015 13:25:04 -0800 Subject: [PATCH] Create a No-op stream in ClientCallImpl.start() if the Context has been cancelled. --- .../io/grpc/inprocess/InProcessTransport.java | 37 +-------- .../java/io/grpc/internal/ClientCallImpl.java | 5 +- .../java/io/grpc/internal/DelayedStream.java | 48 ++---------- .../io/grpc/internal/NoopClientStream.java | 78 +++++++++++++++++++ .../io/grpc/internal/ClientCallImplTest.java | 7 +- 5 files changed, 93 insertions(+), 82 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/NoopClientStream.java diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 20104ba3e0..6971d1e894 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -40,6 +40,7 @@ import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.NoopClientStream; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; @@ -452,41 +453,5 @@ class InProcessTransport implements ServerTransport, ClientTransport { // noop } } - - } - - private static class NoopClientStream implements ClientStream { - @Override - public void request(int numMessages) {} - - @Override - public void writeMessage(InputStream message) {} - - @Override - public void flush() {} - - @Override - public boolean isReady() { - return false; - } - - @Override - public void cancel(Status status) {} - - @Override - public void halfClose() {} - - @Override - public void setCompressor(Compressor c) { - // very much a nop - } - - @Override - public void setDecompressionRegistry(DecompressorRegistry registry) {} - - @Override - public void setMessageCompression(boolean enable) { - // noop - } } } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 365d4fdbde..ce16c0d440 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -164,8 +164,9 @@ final class ClientCallImpl extends ClientCall Preconditions.checkState(stream == null, "Already started"); if (context.isCancelled()) { - // Context is already cancelled so no need to create a stream, just notify the observer of - // cancellation via callback on the executor + // Context is already cancelled so no need to create a real stream, just notify the observer + // of cancellation via callback on the executor + stream = NoopClientStream.INSTANCE; callExecutor.execute(new ContextRunnable(context) { @Override public void runInContext() { diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index abba1b2ef2..616cce6573 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -33,8 +33,6 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkState; -import com.google.common.annotations.VisibleForTesting; - import io.grpc.Compressor; import io.grpc.DecompressorRegistry; import io.grpc.Metadata; @@ -94,9 +92,9 @@ class DelayedStream implements ClientStream { /** * Creates a stream on a presumably usable transport. Must not be called if {@link - * cancelledPrematurely}, as there is no way to close {@code stream} without double-calling {@code - * listener}. Most callers of this method will need to acquire the intrinsic lock to check {@code - * cancelledPrematurely} and this method atomically. + * #cancelledPrematurely}, as there is no way to close {@code stream} without double-calling + * {@code listener}. Most callers of this method will need to acquire the intrinsic lock to check + * {@code cancelledPrematurely} and this method atomically. */ void setStream(ClientStream stream) { synchronized (this) { @@ -137,7 +135,7 @@ class DelayedStream implements ClientStream { void maybeClosePrematurely(final Status reason) { synchronized (this) { if (realStream == null) { - realStream = NOOP_CLIENT_STREAM; + realStream = NoopClientStream.INSTANCE; listener.closed(reason, new Metadata()); } } @@ -145,7 +143,7 @@ class DelayedStream implements ClientStream { public boolean cancelledPrematurely() { synchronized (this) { - return realStream == NOOP_CLIENT_STREAM; + return realStream == NoopClientStream.INSTANCE; } } @@ -249,40 +247,4 @@ class DelayedStream implements ClientStream { } } } - - @VisibleForTesting - static final ClientStream NOOP_CLIENT_STREAM = new ClientStream() { - @Override public void writeMessage(InputStream message) {} - - @Override public void flush() {} - - @Override public void cancel(Status reason) {} - - @Override public void halfClose() {} - - @Override public void request(int numMessages) {} - - @Override public void setCompressor(Compressor c) {} - - @Override - public void setMessageCompression(boolean enable) { - // noop - } - - /** - * Always returns {@code false}, since this is only used when the startup of the {@link - * ClientCall} fails (i.e. the {@link ClientCall} is closed). - */ - @Override public boolean isReady() { - return false; - } - - @Override - public void setDecompressionRegistry(DecompressorRegistry registry) {} - - @Override - public String toString() { - return "NOOP_CLIENT_STREAM"; - } - }; } diff --git a/core/src/main/java/io/grpc/internal/NoopClientStream.java b/core/src/main/java/io/grpc/internal/NoopClientStream.java new file mode 100644 index 0000000000..4194aae6f0 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/NoopClientStream.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import io.grpc.Compressor; +import io.grpc.DecompressorRegistry; +import io.grpc.Status; + +import java.io.InputStream; + +/** + * An implementation of {@link ClientStream} that silently does nothing for the operations. + */ +public class NoopClientStream implements ClientStream { + public static NoopClientStream INSTANCE = new NoopClientStream(); + + @Override + public void request(int numMessages) {} + + @Override + public void writeMessage(InputStream message) {} + + @Override + public void flush() {} + + @Override + public boolean isReady() { + return false; + } + + @Override + public void cancel(Status status) {} + + @Override + public void halfClose() {} + + @Override + public void setCompressor(Compressor c) { + // very much a nop + } + + @Override + public void setDecompressionRegistry(DecompressorRegistry registry) {} + + @Override + public void setMessageCompression(boolean enable) { + // noop + } +} diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index efae55337c..324bcc8113 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -425,6 +425,11 @@ public class ClientCallImplTest { assertEquals(Status.Code.CANCELLED, status.getCode()); assertSame(cause, status.getCause()); + // Following operations should be no-op. + call.request(1); + call.sendMessage(null); + call.halfClose(); + // Stream should never be created. verifyZeroInteractions(transport); @@ -489,7 +494,7 @@ public class ClientCallImplTest { StreamCreationTask task = new StreamCreationTask(delayedStream, headers, method, CallOptions.DEFAULT, streamListener); when(clientTransport.newStream(method, headers, streamListener)) - .thenReturn(DelayedStream.NOOP_CLIENT_STREAM); + .thenReturn(NoopClientStream.INSTANCE); task.onSuccess(clientTransport);