netty: Fix receipt of ClosedChannelException instead of actual error. Fixes #1330.

Our API allows pings to be send even after the transport has been shutdown. We currently
don't handle the case, where the Netty channel has been closed but the NettyClientHandler
has not yet been removed from the pipeline, correctly. That is, we need to query the shutdown
status whenever we receive a ClosedChannelException.

Also, some cleanup.
This commit is contained in:
Jakob Buchgraber 2016-09-12 19:17:34 +02:00 committed by GitHub
parent 8c18a0d355
commit 4aadf550ee
5 changed files with 21 additions and 20 deletions

View File

@ -174,7 +174,7 @@ public class Http2Ping {
this.callbacks = null; this.callbacks = null;
} }
for (Map.Entry<ClientTransport.PingCallback, Executor> entry : callbacks.entrySet()) { for (Map.Entry<ClientTransport.PingCallback, Executor> entry : callbacks.entrySet()) {
doExecute(entry.getValue(), asRunnable(entry.getKey(), failureCause)); notifyFailed(entry.getKey(), entry.getValue(), failureCause);
} }
} }

View File

@ -79,6 +79,7 @@ import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.codec.http2.StreamBufferingEncoder;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import java.nio.channels.ClosedChannelException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
@ -472,7 +473,15 @@ class NettyClientHandler extends AbstractNettyHandler {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
finalPing.failed(future.cause()); Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
cause = lifecycleManager.getShutdownThrowable();
if (cause == null) {
cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
.withCause(future.cause()).asException();
}
}
finalPing.failed(cause);
if (ping == finalPing) { if (ping == finalPing) {
ping = null; ping = null;
} }

View File

@ -99,6 +99,8 @@ class NettyClientTransport implements ConnectionClientTransport {
@Override @Override
public void ping(final PingCallback callback, final Executor executor) { public void ping(final PingCallback callback, final Executor executor) {
// The promise and listener always succeed in NettyClientHandler. So this listener handles the
// error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
ChannelFutureListener failureListener = new ChannelFutureListener() { ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
@ -234,17 +236,17 @@ class NettyClientTransport implements ConnectionClientTransport {
* Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
* from the pipeline when the channel is closed. Since handlers are removed, you may get an * from the pipeline when the channel is closed. Since handlers are removed, you may get an
* unhelpful exception like ClosedChannelException. * unhelpful exception like ClosedChannelException.
*
* <p>This method must only be called on the event loop.
*/ */
private Status statusFromFailedFuture(ChannelFuture f) { private Status statusFromFailedFuture(ChannelFuture f) {
Throwable t = f.cause(); Throwable t = f.cause();
if (t instanceof ClosedChannelException) { if (t instanceof ClosedChannelException) {
synchronized (this) { Status shutdownStatus = lifecycleManager.getShutdownStatus();
Status shutdownStatus = lifecycleManager.getShutdownStatus(); if (shutdownStatus == null) {
if (shutdownStatus == null) { return Status.UNKNOWN.withDescription("Channel closed but for unknown reason");
return Status.UNKNOWN.withDescription("Channel closed but for unknown reason");
}
return shutdownStatus;
} }
return shutdownStatus;
} }
return Utils.statusFromThrowable(t); return Utils.statusFromThrowable(t);
} }

View File

@ -158,10 +158,7 @@ class Utils {
if (s.getCode() != Status.Code.UNKNOWN) { if (s.getCode() != Status.Code.UNKNOWN) {
return s; return s;
} }
// TODO(ejona): reenable once startup races are resolved; ClosedChannelException is being seen if (t instanceof ClosedChannelException) {
// still. Some tests are asserting UNAVAILABLE and were "working" previously but are now
// detecting that our behavior is flaky. See #1330
if (false && t instanceof ClosedChannelException) {
// ClosedChannelException is used any time the Netty channel is closed. Proper error // ClosedChannelException is used any time the Netty channel is closed. Proper error
// processing requires remembering the error that occurred before this one and using it // processing requires remembering the error that occurred before this one and using it
// instead. // instead.

View File

@ -52,7 +52,6 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
@ -459,13 +458,7 @@ public abstract class AbstractTransportTest {
} }
verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture()); verify(mockPingCallback, timeout(TIMEOUT_MS)).onFailure(throwableCaptor.capture());
Status status = Status.fromThrowable(throwableCaptor.getValue()); Status status = Status.fromThrowable(throwableCaptor.getValue());
// TODO(buchgr): Remove once https://github.com/grpc/grpc-java/issues/1330 is resolved. assertCodeEquals(Status.UNAVAILABLE, status);
String stackTrace = "";
if (Status.UNAVAILABLE.getCode() != status.getCode()
&& status.getCause() != null) {
stackTrace = Throwables.getStackTraceAsString(status.getCause());
}
assertCodeEquals(stackTrace, Status.UNAVAILABLE, status);
} }
@Test @Test