Create a No-op stream in ClientCallImpl.start() if the Context has been cancelled.

This commit is contained in:
Xudong Ma 2015-12-04 13:25:04 -08:00
parent f83a6b8761
commit a7398dba38
5 changed files with 93 additions and 82 deletions

View File

@ -40,6 +40,7 @@ import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransport;
@ -452,41 +453,5 @@ class InProcessTransport implements ServerTransport, ClientTransport {
// noop // noop
} }
} }
}
private static class NoopClientStream implements ClientStream {
@Override
public void request(int numMessages) {}
@Override
public void writeMessage(InputStream message) {}
@Override
public void flush() {}
@Override
public boolean isReady() {
return false;
}
@Override
public void cancel(Status status) {}
@Override
public void halfClose() {}
@Override
public void setCompressor(Compressor c) {
// very much a nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@Override
public void setMessageCompression(boolean enable) {
// noop
}
} }
} }

View File

@ -164,8 +164,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
Preconditions.checkState(stream == null, "Already started"); Preconditions.checkState(stream == null, "Already started");
if (context.isCancelled()) { if (context.isCancelled()) {
// Context is already cancelled so no need to create a stream, just notify the observer of // Context is already cancelled so no need to create a real stream, just notify the observer
// cancellation via callback on the executor // of cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
callExecutor.execute(new ContextRunnable(context) { callExecutor.execute(new ContextRunnable(context) {
@Override @Override
public void runInContext() { public void runInContext() {

View File

@ -33,8 +33,6 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Compressor; import io.grpc.Compressor;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -94,9 +92,9 @@ class DelayedStream implements ClientStream {
/** /**
* Creates a stream on a presumably usable transport. Must not be called if {@link * Creates a stream on a presumably usable transport. Must not be called if {@link
* cancelledPrematurely}, as there is no way to close {@code stream} without double-calling {@code * #cancelledPrematurely}, as there is no way to close {@code stream} without double-calling
* listener}. Most callers of this method will need to acquire the intrinsic lock to check {@code * {@code listener}. Most callers of this method will need to acquire the intrinsic lock to check
* cancelledPrematurely} and this method atomically. * {@code cancelledPrematurely} and this method atomically.
*/ */
void setStream(ClientStream stream) { void setStream(ClientStream stream) {
synchronized (this) { synchronized (this) {
@ -137,7 +135,7 @@ class DelayedStream implements ClientStream {
void maybeClosePrematurely(final Status reason) { void maybeClosePrematurely(final Status reason) {
synchronized (this) { synchronized (this) {
if (realStream == null) { if (realStream == null) {
realStream = NOOP_CLIENT_STREAM; realStream = NoopClientStream.INSTANCE;
listener.closed(reason, new Metadata()); listener.closed(reason, new Metadata());
} }
} }
@ -145,7 +143,7 @@ class DelayedStream implements ClientStream {
public boolean cancelledPrematurely() { public boolean cancelledPrematurely() {
synchronized (this) { synchronized (this) {
return realStream == NOOP_CLIENT_STREAM; return realStream == NoopClientStream.INSTANCE;
} }
} }
@ -249,40 +247,4 @@ class DelayedStream implements ClientStream {
} }
} }
} }
@VisibleForTesting
static final ClientStream NOOP_CLIENT_STREAM = new ClientStream() {
@Override public void writeMessage(InputStream message) {}
@Override public void flush() {}
@Override public void cancel(Status reason) {}
@Override public void halfClose() {}
@Override public void request(int numMessages) {}
@Override public void setCompressor(Compressor c) {}
@Override
public void setMessageCompression(boolean enable) {
// noop
}
/**
* Always returns {@code false}, since this is only used when the startup of the {@link
* ClientCall} fails (i.e. the {@link ClientCall} is closed).
*/
@Override public boolean isReady() {
return false;
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@Override
public String toString() {
return "NOOP_CLIENT_STREAM";
}
};
} }

View File

@ -0,0 +1,78 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
import java.io.InputStream;
/**
* An implementation of {@link ClientStream} that silently does nothing for the operations.
*/
public class NoopClientStream implements ClientStream {
public static NoopClientStream INSTANCE = new NoopClientStream();
@Override
public void request(int numMessages) {}
@Override
public void writeMessage(InputStream message) {}
@Override
public void flush() {}
@Override
public boolean isReady() {
return false;
}
@Override
public void cancel(Status status) {}
@Override
public void halfClose() {}
@Override
public void setCompressor(Compressor c) {
// very much a nop
}
@Override
public void setDecompressionRegistry(DecompressorRegistry registry) {}
@Override
public void setMessageCompression(boolean enable) {
// noop
}
}

View File

@ -425,6 +425,11 @@ public class ClientCallImplTest {
assertEquals(Status.Code.CANCELLED, status.getCode()); assertEquals(Status.Code.CANCELLED, status.getCode());
assertSame(cause, status.getCause()); assertSame(cause, status.getCause());
// Following operations should be no-op.
call.request(1);
call.sendMessage(null);
call.halfClose();
// Stream should never be created. // Stream should never be created.
verifyZeroInteractions(transport); verifyZeroInteractions(transport);
@ -489,7 +494,7 @@ public class ClientCallImplTest {
StreamCreationTask task = StreamCreationTask task =
new StreamCreationTask(delayedStream, headers, method, CallOptions.DEFAULT, streamListener); new StreamCreationTask(delayedStream, headers, method, CallOptions.DEFAULT, streamListener);
when(clientTransport.newStream(method, headers, streamListener)) when(clientTransport.newStream(method, headers, streamListener))
.thenReturn(DelayedStream.NOOP_CLIENT_STREAM); .thenReturn(NoopClientStream.INSTANCE);
task.onSuccess(clientTransport); task.onSuccess(clientTransport);