From d86dfc9552e18526d5715253d32b85d5e79bd92b Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Tue, 23 Feb 2016 16:20:46 -0800 Subject: [PATCH] Merge DelayedStream's setError() into cancel() DelayedClientTransport.PendingStream will override cancel(), which has a clearer semantic. Also permitting all status codes except OK in ClientStream.cancel(), instead of just 4 codes. --- .../grpc/internal/AbstractClientStream.java | 3 +- .../java/io/grpc/internal/ClientStream.java | 4 +- .../grpc/internal/DelayedClientTransport.java | 12 ++-- .../java/io/grpc/internal/DelayedStream.java | 61 +++++++++++-------- .../main/java/io/grpc/internal/GrpcUtil.java | 10 --- .../internal/AbstractClientStreamTest.java | 5 +- .../io/grpc/internal/ClientCallImplTest.java | 1 - .../io/grpc/internal/DelayedStreamTest.java | 47 ++++++++++---- .../grpc/netty/CancelClientStreamCommand.java | 5 +- 9 files changed, 77 insertions(+), 71 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 563cf6c40f..2feb1b7e0c 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -34,7 +34,6 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static io.grpc.internal.GrpcUtil.CANCEL_REASONS; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -292,7 +291,7 @@ public abstract class AbstractClientStream extends AbstractStream */ @Override public final void cancel(Status reason) { - checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason"); + checkArgument(!reason.isOk(), "Should not cancel with OK status"); cancelled = true; sendCancel(reason); dispose(); diff --git a/core/src/main/java/io/grpc/internal/ClientStream.java b/core/src/main/java/io/grpc/internal/ClientStream.java index ee5f5a6929..9d875a2a15 100644 --- a/core/src/main/java/io/grpc/internal/ClientStream.java +++ b/core/src/main/java/io/grpc/internal/ClientStream.java @@ -46,9 +46,7 @@ public interface ClientStream extends Stream { * period until {@link ClientStreamListener#closed} is called. This method is safe to be called * at any time and multiple times and from any thread. * - * @param reason must have {@link io.grpc.Status.Code#CANCELLED}, - * {@link io.grpc.Status.Code#DEADLINE_EXCEEDED}, {@link io.grpc.Status.Code#INTERNAL}, - * or {@link io.grpc.Status.Code#UNKNOWN} + * @param reason must be non-OK */ void cancel(Status reason); diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 0b329d1923..6b861a45e5 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -95,9 +95,7 @@ class DelayedClientTransport implements ManagedClientTransport { return pendingStream; } } - DelayedStream stream = new DelayedStream(); - stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown")); - return stream; + return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown")); } @Override @@ -164,7 +162,7 @@ class DelayedClientTransport implements ManagedClientTransport { } if (savedPendingStreams != null) { for (PendingStream stream : savedPendingStreams) { - stream.setError(status); + stream.cancel(status); } listener.transportTerminated(); } @@ -245,10 +243,9 @@ class DelayedClientTransport implements ManagedClientTransport { setStream(transport.newStream(method, headers)); } - // TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be - // overriden. Make it clear or find another method to override. @Override - void setError(Status reason) { + public void cancel(Status reason) { + super.cancel(reason); synchronized (lock) { if (pendingStreams != null) { pendingStreams.remove(this); @@ -258,7 +255,6 @@ class DelayedClientTransport implements ManagedClientTransport { } } } - super.setError(reason); } } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index a18f7a8b60..c6152936b2 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -34,6 +34,8 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Preconditions; + import io.grpc.Compressor; import io.grpc.Decompressor; import io.grpc.Metadata; @@ -53,8 +55,6 @@ import javax.annotation.concurrent.GuardedBy; * DelayedStream} may be internally altered by different threads, thus internal synchronization is * necessary. */ -// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer -// needed by ClientCallImpl as we move away from ListenableFuture class DelayedStream implements ClientStream { // set to non null once both listener and realStream are valid. After this point it is safe @@ -163,13 +163,16 @@ class DelayedStream implements ClientStream { startedRealStream = realStream; } - void setStream(ClientStream stream) { + /** + * Transfers all pending and future requests and mutations to the given stream. + * + *

No-op if either this method or {@link #cancel} have already been called. + */ + final void setStream(ClientStream stream) { synchronized (this) { - if (error != null) { - // If there is an error, unstartedStream will be a Noop. + if (error != null || realStream != null) { return; } - checkState(realStream == null, "Stream already created: %s", realStream); realStream = checkNotNull(stream, "stream"); // listener can only be non-null if start has already been called. if (listener != null) { @@ -178,21 +181,6 @@ class DelayedStream implements ClientStream { } } - void setError(Status reason) { - synchronized (this) { - // If the client has already cancelled the stream don't bother keeping the next error. - if (error == null) { - error = checkNotNull(reason); - realStream = NoopClientStream.INSTANCE; - if (listener != null) { - listener.closed(error, new Metadata()); - // call startStream anyways to drain pending messages. - startStream(); - } - } - } - } - @Override public void writeMessage(InputStream message) { if (startedRealStream == null) { @@ -221,15 +209,34 @@ class DelayedStream implements ClientStream { @Override public void cancel(Status reason) { - if (startedRealStream == null) { + // At least one of them is null. + ClientStream streamToBeCancelled = startedRealStream; + ClientStreamListener listenerToBeCalled = null; + if (streamToBeCancelled == null) { synchronized (this) { - if (startedRealStream == null) { - setError(reason); - return; - } + if (realStream != null) { + // realStream already set. Just cancel it. + streamToBeCancelled = realStream; + } else if (error == null) { + // Neither realStream and error are set. Will set the error and call the listener if + // it's set. + error = checkNotNull(reason); + realStream = NoopClientStream.INSTANCE; + if (listener != null) { + // call startStream anyways to drain pending messages. + startStream(); + listenerToBeCalled = listener; + } + } // else: error already set, do nothing. } } - startedRealStream.cancel(reason); + if (listenerToBeCalled != null) { + Preconditions.checkState(streamToBeCancelled == null, "unexpected streamToBeCancelled"); + listenerToBeCalled.closed(reason, new Metadata()); + } + if (streamToBeCancelled != null) { + streamToBeCancelled.cancel(reason); + } } @Override diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 8ea81b0d73..7d28bc844c 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -32,8 +32,6 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkArgument; -import static io.grpc.Status.Code.CANCELLED; -import static io.grpc.Status.Code.DEADLINE_EXCEEDED; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -51,9 +49,7 @@ import java.lang.reflect.Method; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; -import java.util.EnumSet; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -146,12 +142,6 @@ public final class GrpcUtil { */ public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; - /** - * The set of valid status codes for client cancellation. - */ - public static final Set CANCEL_REASONS = - EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN); - public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults(); public static final Joiner ACCEPT_ENCODING_JOINER = Joiner.on(','); diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 3bd8db5eaf..4ccb4e8cc6 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -83,13 +83,12 @@ public class AbstractClientStreamTest { }; @Test - public void cancel_onlyExpectedCodesAccepted() { + public void cancel_doNotAcceptOk() { for (Code code : Code.values()) { ClientStreamListener listener = new BaseClientStreamListener(); AbstractClientStream stream = new BaseAbstractClientStream(allocator); stream.start(listener); - if (code == Code.DEADLINE_EXCEEDED || code == Code.CANCELLED || code == Code.INTERNAL - || code == Code.UNKNOWN) { + if (code != Code.OK) { stream.cancel(Status.fromCodeValue(code.value())); } else { try { diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 8b6d47efe5..da059b9692 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -109,7 +109,6 @@ public class ClientCallImplTest { @Mock private ClientStreamListener streamListener; @Mock private ClientTransport clientTransport; - @Mock private DelayedStream delayedStream; @Captor private ArgumentCaptor statusCaptor; @Mock diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index b35e935a1e..229a643f55 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -33,6 +33,7 @@ package io.grpc.internal; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -138,21 +139,41 @@ public class DelayedStreamTest { } @Test - public void setStream_cantCreateTwice() { - stream.start(listener); - // The first call will be a success - stream.setStream(realStream); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Stream already created"); - - stream.setStream(realStream); - } - - @Test - public void streamCancelled() { + public void startThenCancelled() { stream.start(listener); stream.cancel(Status.CANCELLED); verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class)); } + + @Test + public void startThenSetStreamThenCancelled() { + stream.start(listener); + stream.setStream(realStream); + stream.cancel(Status.CANCELLED); + verify(realStream).start(same(listener)); + verify(realStream).cancel(same(Status.CANCELLED)); + } + + @Test + public void setStreamThenStartThenCancelled() { + stream.setStream(realStream); + stream.start(listener); + stream.cancel(Status.CANCELLED); + verify(realStream).start(same(listener)); + verify(realStream).cancel(same(Status.CANCELLED)); + } + + @Test + public void setStreamThenCancelled() { + stream.setStream(realStream); + stream.cancel(Status.CANCELLED); + verify(realStream).cancel(same(Status.CANCELLED)); + } + + @Test + public void cancelledThenStart() { + stream.cancel(Status.CANCELLED); + stream.start(listener); + verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class)); + } } diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java index 351498ff5e..2540075008 100644 --- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java @@ -31,8 +31,6 @@ package io.grpc.netty; -import static io.grpc.internal.GrpcUtil.CANCEL_REASONS; - import com.google.common.base.Preconditions; import io.grpc.Status; @@ -47,8 +45,7 @@ class CancelClientStreamCommand { CancelClientStreamCommand(NettyClientStream stream, Status reason) { this.stream = Preconditions.checkNotNull(stream, "stream"); Preconditions.checkNotNull(reason); - Preconditions.checkArgument(CANCEL_REASONS.contains(reason.getCode()), - "Invalid cancellation reason"); + Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); this.reason = reason; }