core: add panic mode for ManagedChannelImpl (#4023)

Channel enters this mode whenever there is an uncaught throwable from
its ChannelExecutor, which is where most channel internal states are
mutated, such as load-balancing.

In panic mode, the channel will always report TRANSIENT_FAILURE as its
state, and will fail RPCs with an INTERNAL error code with the
uncaught throwable as the cause, which is helpful for investigating
bugs within gRPC and 3rd-party LoadBalancer implementations.

## Change to existing behavior
Previously if `LoadBalancer` throws in `handleResolvedAddressGroups()`, it would
be routed back to `LoadBalancer.handleNameResolutionError()`. Now it will make
the channel panic.

## Internal refactors
- Refactored out `shutdownNameResolverAndLoadBalancer()`, called from three code paths: `enterIdleMode()`, `delayedTransport.transportTerminated()`, and `panic()`.
- Refactored out `updateSubchannelPicker()`, called from both `updateBalancingState()` and `panic()`.
This commit is contained in:
Kun Zhang 2018-02-22 09:27:08 -08:00 committed by GitHub
parent 137c74d15f
commit 46c1133a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 247 additions and 42 deletions

View File

@ -34,7 +34,7 @@ import javax.annotation.concurrent.ThreadSafe;
* order as they are submitted.
*/
@ThreadSafe
final class ChannelExecutor {
class ChannelExecutor {
private static final Logger log = Logger.getLogger(ChannelExecutor.class.getName());
private final Object lock = new Object();
@ -51,7 +51,7 @@ final class ChannelExecutor {
* <p>Upon returning, it guarantees that all tasks submitted by {@code executeLater()} before it
* have been or will eventually be run, while not requiring any more calls to {@code drain()}.
*/
void drain() {
final void drain() {
boolean drainLeaseAcquired = false;
while (true) {
Runnable runnable;
@ -72,7 +72,7 @@ final class ChannelExecutor {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception in ChannelExecutor", t);
handleUncaughtThrowable(t);
}
}
}
@ -82,7 +82,7 @@ final class ChannelExecutor {
*
* @return this ChannelExecutor
*/
ChannelExecutor executeLater(Runnable runnable) {
final ChannelExecutor executeLater(Runnable runnable) {
synchronized (lock) {
queue.add(checkNotNull(runnable, "runnable is null"));
}
@ -90,9 +90,18 @@ final class ChannelExecutor {
}
@VisibleForTesting
int numPendingTasks() {
final int numPendingTasks() {
synchronized (lock) {
return queue.size();
}
}
/**
* Handle a throwable from a task.
*
* <p>The default implementation logs a warning.
*/
void handleUncaughtThrowable(Throwable t) {
log.log(Level.WARNING, "Runnable threw exception in ChannelExecutor", t);
}
}

View File

@ -121,7 +121,13 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> oobExecutorPool;
private final ChannelExecutor channelExecutor = new ChannelExecutor();
private final ChannelExecutor channelExecutor = new ChannelExecutor() {
@Override
void handleUncaughtThrowable(Throwable t) {
super.handleUncaughtThrowable(t);
panic(t);
}
};
private boolean fullStreamDecompression;
@ -156,10 +162,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
@Nullable
private LbHelperImpl lbHelper;
// Must be assigned from channelExecutor. null if channel is in idle mode.
// Must ONLY be assigned from updateSubchannelPicker(), which is called from channelExecutor.
// null if channel is in idle mode.
@Nullable
private volatile SubchannelPicker subchannelPicker;
// Must be accessed from the channelExecutor
private boolean panicMode;
// Must be mutated from channelExecutor
// If any monitoring hook to be added later needs to get a snapshot of this Set, we could
// switch to a ConcurrentHashMap.
@ -239,16 +249,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
public void transportTerminated() {
checkState(shutdown.get(), "Channel must have been shut down");
terminating = true;
if (lbHelper != null) {
lbHelper.lb.shutdown();
lbHelper = null;
}
if (nameResolver != null) {
nameResolver.shutdown();
nameResolver = null;
nameResolverStarted = false;
}
shutdownNameResolverAndLoadBalancer(false);
// No need to call channelStateManager since we are already in SHUTDOWN state.
// Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
// here.
maybeShutdownNowSubchannels();
@ -324,6 +326,24 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
@Nullable
private IdleModeTimer idleModeTimer;
// Must be called from channelExecutor
private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
if (verifyActive) {
checkState(nameResolver != null, "nameResolver is null");
checkState(lbHelper != null, "lbHelper is null");
}
if (nameResolver != null) {
nameResolver.shutdown();
nameResolver = null;
nameResolverStarted = false;
}
if (lbHelper != null) {
lbHelper.lb.shutdown();
lbHelper = null;
}
subchannelPicker = null;
}
/**
* Make the channel exit idle mode, if it's in it.
*
@ -331,7 +351,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
*/
@VisibleForTesting
void exitIdleMode() {
if (shutdown.get()) {
if (shutdown.get() || panicMode) {
return;
}
if (inUseStateAggregator.isInUse()) {
@ -366,12 +386,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
// either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
// did not cancel idleModeTimer, or prepareToLoseNetwork() ran while shutdown or in idle, all of
// which are bugs.
nameResolver.shutdown();
nameResolverStarted = false;
shutdownNameResolverAndLoadBalancer(true);
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
lbHelper.lb.shutdown();
lbHelper = null;
subchannelPicker = null;
channelStateManager.gotoState(IDLE);
}
@ -636,6 +652,35 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
return this;
}
// Called from channelExecutor
@VisibleForTesting
void panic(final Throwable t) {
if (panicMode) {
// Preserve the first panic information
return;
}
panicMode = true;
cancelIdleTimer();
shutdownNameResolverAndLoadBalancer(false);
SubchannelPicker newPicker = new SubchannelPicker() {
final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
};
updateSubchannelPicker(newPicker);
channelStateManager.gotoState(TRANSIENT_FAILURE);
}
// Called from channelExecutor
private void updateSubchannelPicker(SubchannelPicker newPicker) {
subchannelPicker = newPicker;
delayedTransport.reprocess(newPicker);
}
@Override
public boolean isShutdown() {
return shutdown.get();
@ -958,8 +1003,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
if (LbHelperImpl.this != lbHelper) {
return;
}
subchannelPicker = newPicker;
delayedTransport.reprocess(newPicker);
updateSubchannelPicker(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) {
@ -1084,16 +1128,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
re);
}
try {
balancer.handleResolvedAddressGroups(servers, config);
} catch (Throwable e) {
logger.log(
Level.WARNING, "[" + getLogId() + "] Unexpected exception from LoadBalancer", e);
// It must be a bug! Push the exception back to LoadBalancer in the hope that it may
// be propagated to the application.
balancer.handleNameResolutionError(Status.INTERNAL.withCause(e)
.withDescription("Thrown from handleResolvedAddresses(): " + e));
}
balancer.handleResolvedAddressGroups(servers, config);
}
}

View File

@ -16,14 +16,18 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -39,7 +43,13 @@ import org.mockito.stubbing.Answer;
*/
@RunWith(JUnit4.class)
public class ChannelExecutorTest {
private final ChannelExecutor executor = new ChannelExecutor();
private final BlockingQueue<Throwable> uncaughtErrors = new LinkedBlockingQueue<Throwable>();
private final ChannelExecutor executor = new ChannelExecutor() {
@Override
void handleUncaughtThrowable(Throwable t) {
uncaughtErrors.add(t);
}
};
@Mock
private Runnable task1;
@ -54,6 +64,10 @@ public class ChannelExecutorTest {
MockitoAnnotations.initMocks(this);
}
@After public void tearDown() {
assertThat(uncaughtErrors).isEmpty();
}
@Test
public void singleThread() {
executor.executeLater(task1);
@ -135,10 +149,11 @@ public class ChannelExecutorTest {
@Test
public void taskThrows() {
InOrder inOrder = inOrder(task1, task2, task3);
final RuntimeException e = new RuntimeException("Simulated");
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
throw new RuntimeException("Simulated");
throw e;
}
}).when(task2).run();
executor.executeLater(task1);
@ -148,5 +163,7 @@ public class ChannelExecutorTest {
inOrder.verify(task1).run();
inOrder.verify(task2).run();
inOrder.verify(task3).run();
assertThat(uncaughtErrors).containsExactly(e);
uncaughtErrors.clear();
}
}

View File

@ -26,6 +26,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static junit.framework.TestCase.assertNotSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@ -56,6 +57,7 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.Context;
import io.grpc.EquivalentAddressGroup;
@ -622,11 +624,8 @@ public class ManagedChannelImplTest {
// NameResolver returns addresses.
nameResolverFactory.allResolved();
// The LoadBalancer will receive the error that it has thrown.
verify(mockLoadBalancer).handleNameResolutionError(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertSame(Status.Code.INTERNAL, status.getCode());
assertSame(ex, status.getCause());
// Exception thrown from balancer is caught by ChannelExecutor, making channel enter panic mode.
verifyPanicMode(ex);
}
@Test
@ -1483,6 +1482,151 @@ public class ManagedChannelImplTest {
assertEquals(IDLE, channel.getState(false));
}
@Test
public void panic_whenIdle() {
subtestPanic(IDLE);
}
@Test
public void panic_whenConnecting() {
subtestPanic(CONNECTING);
}
@Test
public void panic_whenTransientFailure() {
subtestPanic(TRANSIENT_FAILURE);
}
@Test
public void panic_whenReady() {
subtestPanic(READY);
}
private void subtestPanic(ConnectivityState initialState) {
assertNotEquals("We don't test panic mode if it's already SHUTDOWN", SHUTDOWN, initialState);
long idleTimeoutMillis = 2000L;
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR, true, idleTimeoutMillis);
verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class));
assertEquals(1, nameResolverFactory.resolvers.size());
FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.remove(0);
Throwable panicReason = new Exception("Simulated uncaught exception");
if (initialState == IDLE) {
timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
} else {
helper.updateBalancingState(initialState, mockPicker);
}
assertEquals(initialState, channel.getState(false));
if (initialState == IDLE) {
// IDLE mode will shutdown resolver and balancer
verify(mockLoadBalancer).shutdown();
assertTrue(resolver.shutdown);
// A new resolver is created
assertEquals(1, nameResolverFactory.resolvers.size());
resolver = nameResolverFactory.resolvers.remove(0);
assertFalse(resolver.shutdown);
} else {
verify(mockLoadBalancer, never()).shutdown();
assertFalse(resolver.shutdown);
}
// Make channel panic!
channel.panic(panicReason);
// Calls buffered in delayedTransport will fail
// Resolver and balancer are shutdown
verify(mockLoadBalancer).shutdown();
assertTrue(resolver.shutdown);
// Channel will stay in TRANSIENT_FAILURE. getState(true) will not revive it.
assertEquals(TRANSIENT_FAILURE, channel.getState(true));
assertEquals(TRANSIENT_FAILURE, channel.getState(true));
verifyPanicMode(panicReason);
// No new resolver or balancer are created
verifyNoMoreInteractions(mockLoadBalancerFactory);
assertEquals(0, nameResolverFactory.resolvers.size());
// A misbehaving balancer that calls updateBalancingState() after it's shut down will not be
// able to revive it.
helper.updateBalancingState(READY, mockPicker);
verifyPanicMode(panicReason);
// Cannot be revived by exitIdleMode()
channel.exitIdleMode();
verifyPanicMode(panicReason);
// Can still shutdown normally
channel.shutdown();
assertTrue(channel.isShutdown());
assertTrue(channel.isTerminated());
assertEquals(SHUTDOWN, channel.getState(false));
// We didn't stub mockPicker, because it should have never been called in this test.
verifyZeroInteractions(mockPicker);
}
@Test
public void panic_bufferedCallsWillFail() {
FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true);
createChannel(nameResolverFactory, NO_INTERCEPTOR);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult());
helper.updateBalancingState(CONNECTING, mockPicker);
// Start RPCs that will be buffered in delayedTransport
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call.start(mockCallListener, new Metadata());
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyZeroInteractions(mockCallListener, mockCallListener2);
// Enter panic
Throwable panicReason = new Exception("Simulated uncaught exception");
channel.panic(panicReason);
// Buffered RPCs fail immediately
executor.runDueTasks();
verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
}
private void verifyPanicMode(Throwable cause) {
@SuppressWarnings("unchecked")
ClientCall.Listener<Integer> mockListener =
(ClientCall.Listener<Integer>) mock(ClientCall.Listener.class);
assertEquals(TRANSIENT_FAILURE, channel.getState(false));
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockListener, new Metadata());
executor.runDueTasks();
verifyCallListenerClosed(mockListener, Status.Code.INTERNAL, cause);
// Channel is dead. No more pending task to possibly revive it.
assertEquals(0, timer.numPendingTasks());
assertEquals(0, executor.numPendingTasks());
assertEquals(0, oobExecutor.numPendingTasks());
}
private void verifyCallListenerClosed(
ClientCall.Listener<Integer> listener, Status.Code code, Throwable cause) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(null);
verify(listener).onClose(captor.capture(), any(Metadata.class));
Status rpcStatus = captor.getValue();
assertEquals(code, rpcStatus.getCode());
assertSame(cause, rpcStatus.getCause());
verifyNoMoreInteractions(listener);
}
@Test
public void idleTimeoutAndReconnect() {
long idleTimeoutMillis = 2000L;