From 7c2ff632cdf4aab37dd2f88fa59ae6d902936d4d Mon Sep 17 00:00:00 2001 From: nmittler Date: Thu, 24 Sep 2015 16:22:46 -0700 Subject: [PATCH] Fixing flaky test Http2NettyTest.deadlineExceeded() This test occasionally flakes due to the way the deadline timer cancels the stream. Stream cancellation immediately closes the outbound status, disallowing the sending of further messages. The true cancellation is done sometime later in the transport thread in Netty. So there is a race between closing the outbound status and performing the actual cancellation where sent messages will fail with the wrong status. --- .../main/java/io/grpc/internal/AbstractClientStream.java | 8 +++++++- core/src/main/java/io/grpc/internal/AbstractStream.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 98a71e5976..666a86f955 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -60,6 +60,7 @@ public abstract class AbstractClientStream extends AbstractStream private Status status; private Metadata trailers; private Runnable closeListenerTask; + private volatile boolean cancelled; protected AbstractClientStream(WritableBufferAllocator bufferAllocator, ClientStreamListener listener, @@ -293,11 +294,16 @@ public abstract class AbstractClientStream extends AbstractStream @Override public final void cancel(Status reason) { checkArgument(CANCEL_REASONS.contains(reason.getCode()), "Invalid cancellation reason"); - outboundPhase(Phase.STATUS); + cancelled = true; sendCancel(reason); dispose(); } + @Override + public final boolean isReady() { + return !cancelled && super.isReady(); + } + /** * Cancel the stream and send a stream cancellation message to the remote server, if necessary. * Can be called by either the application or transport layers. This method is safe to be called diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index d209fa4cc5..3611ef251b 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -176,7 +176,7 @@ public abstract class AbstractStream implements Stream { } @Override - public final boolean isReady() { + public boolean isReady() { if (listener() != null && outboundPhase() != Phase.STATUS) { synchronized (onReadyLock) { return allocated && numSentBytesQueued < onReadyThreshold;