diff --git a/core/src/main/java/io/grpc/internal/ChannelExecutor.java b/core/src/main/java/io/grpc/internal/ChannelExecutor.java index af62cedced..4134df1425 100644 --- a/core/src/main/java/io/grpc/internal/ChannelExecutor.java +++ b/core/src/main/java/io/grpc/internal/ChannelExecutor.java @@ -34,7 +34,7 @@ import javax.annotation.concurrent.ThreadSafe; * order as they are submitted. */ @ThreadSafe -final class ChannelExecutor { +class ChannelExecutor { private static final Logger log = Logger.getLogger(ChannelExecutor.class.getName()); private final Object lock = new Object(); @@ -51,7 +51,7 @@ final class ChannelExecutor { *

Upon returning, it guarantees that all tasks submitted by {@code executeLater()} before it * have been or will eventually be run, while not requiring any more calls to {@code drain()}. */ - void drain() { + final void drain() { boolean drainLeaseAcquired = false; while (true) { Runnable runnable; @@ -72,7 +72,7 @@ final class ChannelExecutor { try { runnable.run(); } catch (Throwable t) { - log.log(Level.WARNING, "Runnable threw exception in ChannelExecutor", t); + handleUncaughtThrowable(t); } } } @@ -82,7 +82,7 @@ final class ChannelExecutor { * * @return this ChannelExecutor */ - ChannelExecutor executeLater(Runnable runnable) { + final ChannelExecutor executeLater(Runnable runnable) { synchronized (lock) { queue.add(checkNotNull(runnable, "runnable is null")); } @@ -90,9 +90,18 @@ final class ChannelExecutor { } @VisibleForTesting - int numPendingTasks() { + final int numPendingTasks() { synchronized (lock) { return queue.size(); } } + + /** + * Handle a throwable from a task. + * + *

The default implementation logs a warning. + */ + void handleUncaughtThrowable(Throwable t) { + log.log(Level.WARNING, "Runnable threw exception in ChannelExecutor", t); + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index ca7a33e474..1bec23ba64 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -121,7 +121,13 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume private final ObjectPool executorPool; private final ObjectPool oobExecutorPool; - private final ChannelExecutor channelExecutor = new ChannelExecutor(); + private final ChannelExecutor channelExecutor = new ChannelExecutor() { + @Override + void handleUncaughtThrowable(Throwable t) { + super.handleUncaughtThrowable(t); + panic(t); + } + }; private boolean fullStreamDecompression; @@ -156,10 +162,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume @Nullable private LbHelperImpl lbHelper; - // Must be assigned from channelExecutor. null if channel is in idle mode. + // Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor. + // null if channel is in idle mode. @Nullable private volatile SubchannelPicker subchannelPicker; + // Must be accessed from the channelExecutor + private boolean panicMode; + // Must be mutated from channelExecutor // If any monitoring hook to be added later needs to get a snapshot of this Set, we could // switch to a ConcurrentHashMap. @@ -239,16 +249,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume public void transportTerminated() { checkState(shutdown.get(), "Channel must have been shut down"); terminating = true; - if (lbHelper != null) { - lbHelper.lb.shutdown(); - lbHelper = null; - } - if (nameResolver != null) { - nameResolver.shutdown(); - nameResolver = null; - nameResolverStarted = false; - } - + shutdownNameResolverAndLoadBalancer(false); + // No need to call channelStateManager since we are already in SHUTDOWN state. // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them // here. maybeShutdownNowSubchannels(); @@ -324,6 +326,24 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume @Nullable private IdleModeTimer idleModeTimer; + // Must be called from channelExecutor + private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { + if (verifyActive) { + checkState(nameResolver != null, "nameResolver is null"); + checkState(lbHelper != null, "lbHelper is null"); + } + if (nameResolver != null) { + nameResolver.shutdown(); + nameResolver = null; + nameResolverStarted = false; + } + if (lbHelper != null) { + lbHelper.lb.shutdown(); + lbHelper = null; + } + subchannelPicker = null; + } + /** * Make the channel exit idle mode, if it's in it. * @@ -331,7 +351,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume */ @VisibleForTesting void exitIdleMode() { - if (shutdown.get()) { + if (shutdown.get() || panicMode) { return; } if (inUseStateAggregator.isInUse()) { @@ -366,12 +386,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown() // did not cancel idleModeTimer, or prepareToLoseNetwork() ran while shutdown or in idle, all of // which are bugs. - nameResolver.shutdown(); - nameResolverStarted = false; + shutdownNameResolverAndLoadBalancer(true); nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); - lbHelper.lb.shutdown(); - lbHelper = null; - subchannelPicker = null; channelStateManager.gotoState(IDLE); } @@ -636,6 +652,35 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume return this; } + // Called from channelExecutor + @VisibleForTesting + void panic(final Throwable t) { + if (panicMode) { + // Preserve the first panic information + return; + } + panicMode = true; + cancelIdleTimer(); + shutdownNameResolverAndLoadBalancer(false); + SubchannelPicker newPicker = new SubchannelPicker() { + final PickResult panicPickResult = + PickResult.withDrop( + Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return panicPickResult; + } + }; + updateSubchannelPicker(newPicker); + channelStateManager.gotoState(TRANSIENT_FAILURE); + } + + // Called from channelExecutor + private void updateSubchannelPicker(SubchannelPicker newPicker) { + subchannelPicker = newPicker; + delayedTransport.reprocess(newPicker); + } + @Override public boolean isShutdown() { return shutdown.get(); @@ -958,8 +1003,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume if (LbHelperImpl.this != lbHelper) { return; } - subchannelPicker = newPicker; - delayedTransport.reprocess(newPicker); + updateSubchannelPicker(newPicker); // It's not appropriate to report SHUTDOWN state from lb. // Ignore the case of newState == SHUTDOWN for now. if (newState != SHUTDOWN) { @@ -1084,16 +1128,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume re); } - try { - balancer.handleResolvedAddressGroups(servers, config); - } catch (Throwable e) { - logger.log( - Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e); - // It must be a bug! Push the exception back to LoadBalancer in the hope that it may - // be propagated to the application. - balancer.handleNameResolutionError(Status.INTERNAL.withCause(e) - .withDescription("Thrown from handleResolvedAddresses(): " + e)); - } + balancer.handleResolvedAddressGroups(servers, config); } } diff --git a/core/src/test/java/io/grpc/internal/ChannelExecutorTest.java b/core/src/test/java/io/grpc/internal/ChannelExecutorTest.java index 9b0b3c53c1..a3f9c941b3 100644 --- a/core/src/test/java/io/grpc/internal/ChannelExecutorTest.java +++ b/core/src/test/java/io/grpc/internal/ChannelExecutorTest.java @@ -16,14 +16,18 @@ package io.grpc.internal; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +43,13 @@ import org.mockito.stubbing.Answer; */ @RunWith(JUnit4.class) public class ChannelExecutorTest { - private final ChannelExecutor executor = new ChannelExecutor(); + private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue(); + private final ChannelExecutor executor = new ChannelExecutor() { + @Override + void handleUncaughtThrowable(Throwable t) { + uncaughtErrors.add(t); + } + }; @Mock private Runnable task1; @@ -54,6 +64,10 @@ public class ChannelExecutorTest { MockitoAnnotations.initMocks(this); } + @After public void tearDown() { + assertThat(uncaughtErrors).isEmpty(); + } + @Test public void singleThread() { executor.executeLater(task1); @@ -135,10 +149,11 @@ public class ChannelExecutorTest { @Test public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); + final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) { - throw new RuntimeException("Simulated"); + throw e; } }).when(task2).run(); executor.executeLater(task1); @@ -148,5 +163,7 @@ public class ChannelExecutorTest { inOrder.verify(task1).run(); inOrder.verify(task2).run(); inOrder.verify(task3).run(); + assertThat(uncaughtErrors).containsExactly(e); + uncaughtErrors.clear(); } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 2edb025abd..b85eb25d7b 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -26,6 +26,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static junit.framework.TestCase.assertNotSame; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -56,6 +57,7 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientStreamTracer; +import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.Context; import io.grpc.EquivalentAddressGroup; @@ -622,11 +624,8 @@ public class ManagedChannelImplTest { // NameResolver returns addresses. nameResolverFactory.allResolved(); - // The LoadBalancer will receive the error that it has thrown. - verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture()); - Status status = statusCaptor.getValue(); - assertSame(Status.Code.INTERNAL, status.getCode()); - assertSame(ex, status.getCause()); + // Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode. + verifyPanicMode(ex); } @Test @@ -1483,6 +1482,151 @@ public class ManagedChannelImplTest { assertEquals(IDLE, channel.getState(false)); } + @Test + public void panic_whenIdle() { + subtestPanic(IDLE); + } + + @Test + public void panic_whenConnecting() { + subtestPanic(CONNECTING); + } + + @Test + public void panic_whenTransientFailure() { + subtestPanic(TRANSIENT_FAILURE); + } + + @Test + public void panic_whenReady() { + subtestPanic(READY); + } + + private void subtestPanic(ConnectivityState initialState) { + assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState); + long idleTimeoutMillis = 2000L; + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR, true, idleTimeoutMillis); + + verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); + assertEquals(1, nameResolverFactory.resolvers.size()); + FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0); + + Throwable panicReason = new Exception("Simulated uncaught exception"); + if (initialState == IDLE) { + timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); + } else { + helper.updateBalancingState(initialState, mockPicker); + } + assertEquals(initialState, channel.getState(false)); + + if (initialState == IDLE) { + // IDLE mode will shutdown resolver and balancer + verify(mockLoadBalancer).shutdown(); + assertTrue(resolver.shutdown); + // A new resolver is created + assertEquals(1, nameResolverFactory.resolvers.size()); + resolver = nameResolverFactory.resolvers.remove(0); + assertFalse(resolver.shutdown); + } else { + verify(mockLoadBalancer, never()).shutdown(); + assertFalse(resolver.shutdown); + } + + // Make channel panic! + channel.panic(panicReason); + + // Calls buffered in delayedTransport will fail + + // Resolver and balancer are shutdown + verify(mockLoadBalancer).shutdown(); + assertTrue(resolver.shutdown); + + // Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it. + assertEquals(TRANSIENT_FAILURE, channel.getState(true)); + assertEquals(TRANSIENT_FAILURE, channel.getState(true)); + verifyPanicMode(panicReason); + + // No new resolver or balancer are created + verifyNoMoreInteractions(mockLoadBalancerFactory); + assertEquals(0, nameResolverFactory.resolvers.size()); + + // A misbehaving balancer that calls updateBalancingState() after it's shut down will not be + // able to revive it. + helper.updateBalancingState(READY, mockPicker); + verifyPanicMode(panicReason); + + // Cannot be revived by exitIdleMode() + channel.exitIdleMode(); + verifyPanicMode(panicReason); + + // Can still shutdown normally + channel.shutdown(); + assertTrue(channel.isShutdown()); + assertTrue(channel.isTerminated()); + assertEquals(SHUTDOWN, channel.getState(false)); + + // We didn't stub mockPicker, because it should have never been called in this test. + verifyZeroInteractions(mockPicker); + } + + @Test + public void panic_bufferedCallsWillFail() { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult()); + helper.updateBalancingState(CONNECTING, mockPicker); + + // Start RPCs that will be buffered in delayedTransport + ClientCall call = + channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady()); + call.start(mockCallListener, new Metadata()); + + ClientCall call2 = + channel.newCall(method, CallOptions.DEFAULT.withWaitForReady()); + call2.start(mockCallListener2, new Metadata()); + + executor.runDueTasks(); + verifyZeroInteractions(mockCallListener, mockCallListener2); + + // Enter panic + Throwable panicReason = new Exception("Simulated uncaught exception"); + channel.panic(panicReason); + + // Buffered RPCs fail immediately + executor.runDueTasks(); + verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason); + verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason); + } + + private void verifyPanicMode(Throwable cause) { + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = + (ClientCall.Listener) mock(ClientCall.Listener.class); + assertEquals(TRANSIENT_FAILURE, channel.getState(false)); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockListener, new Metadata()); + executor.runDueTasks(); + verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause); + + // Channel is dead. No more pending task to possibly revive it. + assertEquals(0, timer.numPendingTasks()); + assertEquals(0, executor.numPendingTasks()); + assertEquals(0, oobExecutor.numPendingTasks()); + } + + private void verifyCallListenerClosed( + ClientCall.Listener listener, Status.Code code, Throwable cause) { + ArgumentCaptor captor = ArgumentCaptor.forClass(null); + verify(listener).onClose(captor.capture(), any(Metadata.class)); + Status rpcStatus = captor.getValue(); + assertEquals(code, rpcStatus.getCode()); + assertSame(cause, rpcStatus.getCause()); + verifyNoMoreInteractions(listener); + } + @Test public void idleTimeoutAndReconnect() { long idleTimeoutMillis = 2000L;