diff --git a/core/src/main/java/io/grpc/internal/Http2Ping.java b/core/src/main/java/io/grpc/internal/Http2Ping.java index 2b4df888af..550e7bd36a 100644 --- a/core/src/main/java/io/grpc/internal/Http2Ping.java +++ b/core/src/main/java/io/grpc/internal/Http2Ping.java @@ -174,7 +174,7 @@ public class Http2Ping { this.callbacks = null; } for (Map.Entry entry : callbacks.entrySet()) { - doExecute(entry.getValue(), asRunnable(entry.getKey(), failureCause)); + notifyFailed(entry.getKey(), entry.getValue(), failureCause); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 054d79e59a..3810a0acf3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -79,6 +79,7 @@ import io.netty.handler.codec.http2.Http2StreamVisitor; import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.logging.LogLevel; +import java.nio.channels.ClosedChannelException; import java.util.Random; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -472,7 +473,15 @@ class NettyClientHandler extends AbstractNettyHandler { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - finalPing.failed(future.cause()); + Throwable cause = future.cause(); + if (cause instanceof ClosedChannelException) { + cause = lifecycleManager.getShutdownThrowable(); + if (cause == null) { + cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.") + .withCause(future.cause()).asException(); + } + } + finalPing.failed(cause); if (ping == finalPing) { ping = null; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 76fffc80e0..9cec1fa07e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -99,6 +99,8 @@ class NettyClientTransport implements ConnectionClientTransport { @Override public void ping(final PingCallback callback, final Executor executor) { + // The promise and listener always succeed in NettyClientHandler. So this listener handles the + // error case, when the channel is closed and the NettyClientHandler no longer in the pipeline. ChannelFutureListener failureListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -234,17 +236,17 @@ class NettyClientTransport implements ConnectionClientTransport { * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed * from the pipeline when the channel is closed. Since handlers are removed, you may get an * unhelpful exception like ClosedChannelException. + * + *

This method must only be called on the event loop. */ private Status statusFromFailedFuture(ChannelFuture f) { Throwable t = f.cause(); if (t instanceof ClosedChannelException) { - synchronized (this) { - Status shutdownStatus = lifecycleManager.getShutdownStatus(); - if (shutdownStatus == null) { - return Status.UNKNOWN.withDescription("Channel closed but for unknown reason"); - } - return shutdownStatus; + Status shutdownStatus = lifecycleManager.getShutdownStatus(); + if (shutdownStatus == null) { + return Status.UNKNOWN.withDescription("Channel closed but for unknown reason"); } + return shutdownStatus; } return Utils.statusFromThrowable(t); } diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index e68f693ba4..1b8c47c354 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -158,10 +158,7 @@ class Utils { if (s.getCode() != Status.Code.UNKNOWN) { return s; } - // TODO(ejona): reenable once startup races are resolved; ClosedChannelException is being seen - // still. Some tests are asserting UNAVAILABLE and were "working" previously but are now - // detecting that our behavior is flaky. See #1330 - if (false && t instanceof ClosedChannelException) { + if (t instanceof ClosedChannelException) { // ClosedChannelException is used any time the Netty channel is closed. Proper error // processing requires remembering the error that occurred before this one and using it // instead. diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 6e83ee14c3..a501a80af1 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -52,7 +52,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.MoreExecutors; @@ -459,13 +458,7 @@ public abstract class AbstractTransportTest { } verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture()); Status status = Status.fromThrowable(throwableCaptor.getValue()); - // TODO(buchgr): Remove once https://github.com/grpc/grpc-java/issues/1330 is resolved. - String stackTrace = ""; - if (Status.UNAVAILABLE.getCode() != status.getCode() - && status.getCause() != null) { - stackTrace = Throwables.getStackTraceAsString(status.getCause()); - } - assertCodeEquals(stackTrace, Status.UNAVAILABLE, status); + assertCodeEquals(Status.UNAVAILABLE, status); } @Test