Merge DelayedStream's setError() into cancel()

DelayedClientTransport.PendingStream will override cancel(), which has a
clearer semantic.

Also permitting all status codes except OK in ClientStream.cancel(),
instead of just 4 codes.
This commit is contained in:
Kun Zhang 2016-02-23 16:20:46 -08:00
parent a959126fb3
commit d86dfc9552
9 changed files with 77 additions and 71 deletions

View File

@ -34,7 +34,6 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -292,7 +291,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
*/ */
@Override @Override
public final void cancel(Status reason) { public final void cancel(Status reason) {
checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason"); checkArgument(!reason.isOk(), "Should not cancel with OK status");
cancelled = true; cancelled = true;
sendCancel(reason); sendCancel(reason);
dispose(); dispose();

View File

@ -46,9 +46,7 @@ public interface ClientStream extends Stream {
* period until {@link ClientStreamListener#closed} is called. This method is safe to be called * period until {@link ClientStreamListener#closed} is called. This method is safe to be called
* at any time and multiple times and from any thread. * at any time and multiple times and from any thread.
* *
* @param reason must have {@link io.grpc.Status.Code#CANCELLED}, * @param reason must be non-OK
* {@link io.grpc.Status.Code#DEADLINE_EXCEEDED}, {@link io.grpc.Status.Code#INTERNAL},
* or {@link io.grpc.Status.Code#UNKNOWN}
*/ */
void cancel(Status reason); void cancel(Status reason);

View File

@ -95,9 +95,7 @@ class DelayedClientTransport implements ManagedClientTransport {
return pendingStream; return pendingStream;
} }
} }
DelayedStream stream = new DelayedStream(); return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown"));
stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown"));
return stream;
} }
@Override @Override
@ -164,7 +162,7 @@ class DelayedClientTransport implements ManagedClientTransport {
} }
if (savedPendingStreams != null) { if (savedPendingStreams != null) {
for (PendingStream stream : savedPendingStreams) { for (PendingStream stream : savedPendingStreams) {
stream.setError(status); stream.cancel(status);
} }
listener.transportTerminated(); listener.transportTerminated();
} }
@ -245,10 +243,9 @@ class DelayedClientTransport implements ManagedClientTransport {
setStream(transport.newStream(method, headers)); setStream(transport.newStream(method, headers));
} }
// TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be
// overriden. Make it clear or find another method to override.
@Override @Override
void setError(Status reason) { public void cancel(Status reason) {
super.cancel(reason);
synchronized (lock) { synchronized (lock) {
if (pendingStreams != null) { if (pendingStreams != null) {
pendingStreams.remove(this); pendingStreams.remove(this);
@ -258,7 +255,6 @@ class DelayedClientTransport implements ManagedClientTransport {
} }
} }
} }
super.setError(reason);
} }
} }

View File

@ -34,6 +34,8 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Preconditions;
import io.grpc.Compressor; import io.grpc.Compressor;
import io.grpc.Decompressor; import io.grpc.Decompressor;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -53,8 +55,6 @@ import javax.annotation.concurrent.GuardedBy;
* DelayedStream} may be internally altered by different threads, thus internal synchronization is * DelayedStream} may be internally altered by different threads, thus internal synchronization is
* necessary. * necessary.
*/ */
// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer
// needed by ClientCallImpl as we move away from ListenableFuture<ClientTransport>
class DelayedStream implements ClientStream { class DelayedStream implements ClientStream {
// set to non null once both listener and realStream are valid. After this point it is safe // set to non null once both listener and realStream are valid. After this point it is safe
@ -163,13 +163,16 @@ class DelayedStream implements ClientStream {
startedRealStream = realStream; startedRealStream = realStream;
} }
void setStream(ClientStream stream) { /**
* Transfers all pending and future requests and mutations to the given stream.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
final void setStream(ClientStream stream) {
synchronized (this) { synchronized (this) {
if (error != null) { if (error != null || realStream != null) {
// If there is an error, unstartedStream will be a Noop.
return; return;
} }
checkState(realStream == null, "Stream already created: %s", realStream);
realStream = checkNotNull(stream, "stream"); realStream = checkNotNull(stream, "stream");
// listener can only be non-null if start has already been called. // listener can only be non-null if start has already been called.
if (listener != null) { if (listener != null) {
@ -178,21 +181,6 @@ class DelayedStream implements ClientStream {
} }
} }
void setError(Status reason) {
synchronized (this) {
// If the client has already cancelled the stream don't bother keeping the next error.
if (error == null) {
error = checkNotNull(reason);
realStream = NoopClientStream.INSTANCE;
if (listener != null) {
listener.closed(error, new Metadata());
// call startStream anyways to drain pending messages.
startStream();
}
}
}
}
@Override @Override
public void writeMessage(InputStream message) { public void writeMessage(InputStream message) {
if (startedRealStream == null) { if (startedRealStream == null) {
@ -221,15 +209,34 @@ class DelayedStream implements ClientStream {
@Override @Override
public void cancel(Status reason) { public void cancel(Status reason) {
if (startedRealStream == null) { // At least one of them is null.
ClientStream streamToBeCancelled = startedRealStream;
ClientStreamListener listenerToBeCalled = null;
if (streamToBeCancelled == null) {
synchronized (this) { synchronized (this) {
if (startedRealStream == null) { if (realStream != null) {
setError(reason); // realStream already set. Just cancel it.
return; streamToBeCancelled = realStream;
} else if (error == null) {
// Neither realStream and error are set. Will set the error and call the listener if
// it's set.
error = checkNotNull(reason);
realStream = NoopClientStream.INSTANCE;
if (listener != null) {
// call startStream anyways to drain pending messages.
startStream();
listenerToBeCalled = listener;
}
} // else: error already set, do nothing.
} }
} }
if (listenerToBeCalled != null) {
Preconditions.checkState(streamToBeCancelled == null, "unexpected streamToBeCancelled");
listenerToBeCalled.closed(reason, new Metadata());
}
if (streamToBeCancelled != null) {
streamToBeCancelled.cancel(reason);
} }
startedRealStream.cancel(reason);
} }
@Override @Override

View File

@ -32,8 +32,6 @@
package io.grpc.internal; package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.Status.Code.CANCELLED;
import static io.grpc.Status.Code.DEADLINE_EXCEEDED;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
@ -51,9 +49,7 @@ import java.lang.reflect.Method;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -146,12 +142,6 @@ public final class GrpcUtil {
*/ */
public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192;
/**
* The set of valid status codes for client cancellation.
*/
public static final Set<Status.Code> CANCEL_REASONS =
EnumSet.of(CANCELLED, DEADLINE_EXCEEDED, Status.Code.INTERNAL, Status.Code.UNKNOWN);
public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults(); public static final Splitter ACCEPT_ENCODING_SPLITER = Splitter.on(',').trimResults();
public static final Joiner ACCEPT_ENCODING_JOINER = Joiner.on(','); public static final Joiner ACCEPT_ENCODING_JOINER = Joiner.on(',');

View File

@ -83,13 +83,12 @@ public class AbstractClientStreamTest {
}; };
@Test @Test
public void cancel_onlyExpectedCodesAccepted() { public void cancel_doNotAcceptOk() {
for (Code code : Code.values()) { for (Code code : Code.values()) {
ClientStreamListener listener = new BaseClientStreamListener(); ClientStreamListener listener = new BaseClientStreamListener();
AbstractClientStream<Integer> stream = new BaseAbstractClientStream<Integer>(allocator); AbstractClientStream<Integer> stream = new BaseAbstractClientStream<Integer>(allocator);
stream.start(listener); stream.start(listener);
if (code == Code.DEADLINE_EXCEEDED || code == Code.CANCELLED || code == Code.INTERNAL if (code != Code.OK) {
|| code == Code.UNKNOWN) {
stream.cancel(Status.fromCodeValue(code.value())); stream.cancel(Status.fromCodeValue(code.value()));
} else { } else {
try { try {

View File

@ -109,7 +109,6 @@ public class ClientCallImplTest {
@Mock private ClientStreamListener streamListener; @Mock private ClientStreamListener streamListener;
@Mock private ClientTransport clientTransport; @Mock private ClientTransport clientTransport;
@Mock private DelayedStream delayedStream;
@Captor private ArgumentCaptor<Status> statusCaptor; @Captor private ArgumentCaptor<Status> statusCaptor;
@Mock @Mock

View File

@ -33,6 +33,7 @@ package io.grpc.internal;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -138,21 +139,41 @@ public class DelayedStreamTest {
} }
@Test @Test
public void setStream_cantCreateTwice() { public void startThenCancelled() {
stream.start(listener);
// The first call will be a success
stream.setStream(realStream);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Stream already created");
stream.setStream(realStream);
}
@Test
public void streamCancelled() {
stream.start(listener); stream.start(listener);
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class)); verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
} }
@Test
public void startThenSetStreamThenCancelled() {
stream.start(listener);
stream.setStream(realStream);
stream.cancel(Status.CANCELLED);
verify(realStream).start(same(listener));
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void setStreamThenStartThenCancelled() {
stream.setStream(realStream);
stream.start(listener);
stream.cancel(Status.CANCELLED);
verify(realStream).start(same(listener));
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void setStreamThenCancelled() {
stream.setStream(realStream);
stream.cancel(Status.CANCELLED);
verify(realStream).cancel(same(Status.CANCELLED));
}
@Test
public void cancelledThenStart() {
stream.cancel(Status.CANCELLED);
stream.start(listener);
verify(listener).closed(eq(Status.CANCELLED), isA(Metadata.class));
}
} }

View File

@ -31,8 +31,6 @@
package io.grpc.netty; package io.grpc.netty;
import static io.grpc.internal.GrpcUtil.CANCEL_REASONS;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.grpc.Status; import io.grpc.Status;
@ -47,8 +45,7 @@ class CancelClientStreamCommand {
CancelClientStreamCommand(NettyClientStream stream, Status reason) { CancelClientStreamCommand(NettyClientStream stream, Status reason) {
this.stream = Preconditions.checkNotNull(stream, "stream"); this.stream = Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(reason); Preconditions.checkNotNull(reason);
Preconditions.checkArgument(CANCEL_REASONS.contains(reason.getCode()), Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
"Invalid cancellation reason");
this.reason = reason; this.reason = reason;
} }