diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 98a71e5976..666a86f955 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -60,6 +60,7 @@ public abstract class AbstractClientStream extends AbstractStream private Status status; private Metadata trailers; private Runnable closeListenerTask; + private volatile boolean cancelled; protected AbstractClientStream(WritableBufferAllocator bufferAllocator, ClientStreamListener listener, @@ -293,11 +294,16 @@ public abstract class AbstractClientStream extends AbstractStream @Override public final void cancel(Status reason) { checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason"); - outboundPhase(Phase.STATUS); + cancelled = true; sendCancel(reason); dispose(); } + @Override + public final boolean isReady() { + return !cancelled && super.isReady(); + } + /** * Cancel the stream and send a stream cancellation message to the remote server, if necessary. * Can be called by either the application or transport layers. This method is safe to be called diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index d209fa4cc5..3611ef251b 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -176,7 +176,7 @@ public abstract class AbstractStream implements Stream { } @Override - public final boolean isReady() { + public boolean isReady() { if (listener() != null && outboundPhase() != Phase.STATUS) { synchronized (onReadyLock) { return allocated && numSentBytesQueued < onReadyThreshold;