core: InternalSubchannel uses ChannelExecutor. (#2503)

This commit is contained in:
Kun Zhang 2016-12-16 14:17:38 -08:00 committed by GitHub
parent cb6cf1ae2f
commit f8f569e078
2 changed files with 128 additions and 113 deletions

View File

@ -51,14 +51,12 @@ import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@ -72,8 +70,6 @@ import javax.annotation.concurrent.ThreadSafe;
final class InternalSubchannel implements WithLogId { final class InternalSubchannel implements WithLogId {
private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName());
private final Object lock = new Object();
private final LogId logId = LogId.allocate(getClass().getName()); private final LogId logId = LogId.allocate(getClass().getName());
private final EquivalentAddressGroup addressGroup; private final EquivalentAddressGroup addressGroup;
private final String authority; private final String authority;
@ -83,18 +79,18 @@ final class InternalSubchannel implements WithLogId {
private final ClientTransportFactory transportFactory; private final ClientTransportFactory transportFactory;
private final ScheduledExecutorService scheduledExecutor; private final ScheduledExecutorService scheduledExecutor;
// A serializing executor shared across the Channel // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock.
private final Object lock = new Object();
// File-specific convention:
// //
// TODO(zhangkun83): decide the type of Channel Executor. I considered a SerializingExecutor // 1. In a method without GuardedBy("lock"), executeLater() MUST be followed by a drain() later in
// based on the app executor, but it seems abusive because the app executor is intended for app // the same method.
// logic, not for channel bookkeeping. We don't want channel bookkeeping logic to contend for
// threads with app logic, which may increase latency or even cause starvation. Instead, we
// should consider a thread-less Executor after the refactor of ManagedChannelImpl is done.
// //
// NOTE: there are cases where channelExecutor.execute() is run under "lock". This will add risk // 2. drain() MUST NOT be called under "lock".
// of deadlock if channelExecutor is based on a direct executor. Thread-less executor wouldn't //
// have such problem. // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally".
private final Executor channelExecutor; private final ChannelExecutor channelExecutor;
@GuardedBy("lock") @GuardedBy("lock")
private int nextAddressIndex; private int nextAddressIndex;
@ -158,7 +154,7 @@ final class InternalSubchannel implements WithLogId {
InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, Executor channelExecutor, Callback callback) { Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback) {
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
this.authority = authority; this.authority = authority;
this.userAgent = userAgent; this.userAgent = userAgent;
@ -173,35 +169,34 @@ final class InternalSubchannel implements WithLogId {
/** /**
* Returns a READY transport that will be used to create new streams. * Returns a READY transport that will be used to create new streams.
* *
* <p>Returns {@code null} if the state is not READY. * <p>Returns {@code null} if the state is not READY. Will try to connect if state is IDLE.
*/ */
@Nullable @Nullable
final ClientTransport obtainActiveTransport() { ClientTransport obtainActiveTransport() {
ClientTransport savedTransport = activeTransport; ClientTransport savedTransport = activeTransport;
if (savedTransport != null) { if (savedTransport != null) {
return savedTransport; return savedTransport;
} }
Runnable runnable = null; try {
synchronized (lock) { synchronized (lock) {
savedTransport = activeTransport; savedTransport = activeTransport;
// Check again, since it could have changed before acquiring the lock // Check again, since it could have changed before acquiring the lock
if (savedTransport != null) { if (savedTransport != null) {
return savedTransport; return savedTransport;
}
if (state.getState() == IDLE) {
gotoNonErrorState(CONNECTING);
startNewTransport();
}
} }
if (state.getState() == IDLE) { } finally {
gotoNonErrorState(CONNECTING); channelExecutor.drain();
runnable = startNewTransport();
}
}
if (runnable != null) {
runnable.run();
} }
return null; return null;
} }
@CheckReturnValue
@GuardedBy("lock") @GuardedBy("lock")
private Runnable startNewTransport() { private void startNewTransport() {
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (nextAddressIndex == 0) { if (nextAddressIndex == 0) {
@ -217,11 +212,14 @@ final class InternalSubchannel implements WithLogId {
transportFactory.newClientTransport(address, authority, userAgent); transportFactory.newClientTransport(address, authority, userAgent);
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}", log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {getLogId(), transport.getLogId(), address}); new Object[] {logId, transport.getLogId(), address});
} }
pendingTransport = transport; pendingTransport = transport;
transports.add(transport); transports.add(transport);
return transport.start(new TransportListener(transport, address)); Runnable runnable = transport.start(new TransportListener(transport, address));
if (runnable != null) {
channelExecutor.executeLater(runnable);
}
} }
/** /**
@ -235,7 +233,6 @@ final class InternalSubchannel implements WithLogId {
@Override @Override
public void run() { public void run() {
try { try {
Runnable runnable = null;
synchronized (lock) { synchronized (lock) {
reconnectTask = null; reconnectTask = null;
if (state.getState() == SHUTDOWN) { if (state.getState() == SHUTDOWN) {
@ -244,13 +241,12 @@ final class InternalSubchannel implements WithLogId {
return; return;
} }
gotoNonErrorState(CONNECTING); gotoNonErrorState(CONNECTING);
runnable = startNewTransport(); startNewTransport();
}
if (runnable != null) {
runnable.run();
} }
} catch (Throwable t) { } catch (Throwable t) {
log.log(Level.WARNING, "Exception handling end of backoff", t); log.log(Level.WARNING, "Exception handling end of backoff", t);
} finally {
channelExecutor.drain();
} }
} }
} }
@ -262,8 +258,7 @@ final class InternalSubchannel implements WithLogId {
long delayMillis = long delayMillis =
reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS); reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS);
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", new Object[]{logId, delayMillis});
new Object[]{getLogId(), delayMillis});
} }
Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
reconnectTask = scheduledExecutor.schedule( reconnectTask = scheduledExecutor.schedule(
@ -283,7 +278,7 @@ final class InternalSubchannel implements WithLogId {
Preconditions.checkState(state.getState() != SHUTDOWN, Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState); "Cannot transition out of SHUTDOWN to " + newState);
state = newState; state = newState;
channelExecutor.execute(new Runnable() { channelExecutor.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {
callback.onStateChange(InternalSubchannel.this, newState); callback.onStateChange(InternalSubchannel.this, newState);
@ -295,22 +290,26 @@ final class InternalSubchannel implements WithLogId {
public void shutdown() { public void shutdown() {
ManagedClientTransport savedActiveTransport; ManagedClientTransport savedActiveTransport;
ConnectionClientTransport savedPendingTransport; ConnectionClientTransport savedPendingTransport;
synchronized (lock) { try {
if (state.getState() == SHUTDOWN) { synchronized (lock) {
return; if (state.getState() == SHUTDOWN) {
} return;
gotoNonErrorState(SHUTDOWN);
savedActiveTransport = activeTransport;
savedPendingTransport = pendingTransport;
activeTransport = null;
pendingTransport = null;
if (transports.isEmpty()) {
handleTermination();
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated in shutdown()", getLogId());
} }
} // else: the callback will be run once all transports have been terminated gotoNonErrorState(SHUTDOWN);
cancelReconnectTask(); savedActiveTransport = activeTransport;
savedPendingTransport = pendingTransport;
activeTransport = null;
pendingTransport = null;
if (transports.isEmpty()) {
handleTermination();
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated in shutdown()", logId);
}
} // else: the callback will be run once all transports have been terminated
cancelReconnectTask();
}
} finally {
channelExecutor.drain();
} }
if (savedActiveTransport != null) { if (savedActiveTransport != null) {
savedActiveTransport.shutdown(); savedActiveTransport.shutdown();
@ -320,9 +319,9 @@ final class InternalSubchannel implements WithLogId {
} }
} }
// May be called under lock. @GuardedBy("lock")
private void handleTermination() { private void handleTermination() {
channelExecutor.execute(new Runnable() { channelExecutor.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {
callback.onTerminated(InternalSubchannel.this); callback.onTerminated(InternalSubchannel.this);
@ -332,25 +331,33 @@ final class InternalSubchannel implements WithLogId {
private void handleTransportInUseState( private void handleTransportInUseState(
final ConnectionClientTransport transport, final boolean inUse) { final ConnectionClientTransport transport, final boolean inUse) {
channelExecutor.execute(new Runnable() { channelExecutor.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {
inUseStateAggregator.updateObjectInUse(transport, inUse); inUseStateAggregator.updateObjectInUse(transport, inUse);
} }
}); }).drain();
} }
void shutdownNow(Status reason) { void shutdownNow(Status reason) {
shutdown(); shutdown();
Collection<ManagedClientTransport> transportsCopy; Collection<ManagedClientTransport> transportsCopy;
synchronized (lock) { try {
transportsCopy = new ArrayList<ManagedClientTransport>(transports); synchronized (lock) {
transportsCopy = new ArrayList<ManagedClientTransport>(transports);
}
} finally {
channelExecutor.drain();
} }
for (ManagedClientTransport transport : transportsCopy) { for (ManagedClientTransport transport : transportsCopy) {
transport.shutdownNow(reason); transport.shutdownNow(reason);
} }
} }
EquivalentAddressGroup getAddressGroup() {
return addressGroup;
}
@GuardedBy("lock") @GuardedBy("lock")
private void cancelReconnectTask() { private void cancelReconnectTask() {
if (reconnectTask != null) { if (reconnectTask != null) {
@ -366,8 +373,12 @@ final class InternalSubchannel implements WithLogId {
@VisibleForTesting @VisibleForTesting
ConnectivityState getState() { ConnectivityState getState() {
synchronized (lock) { try {
return state.getState(); synchronized (lock) {
return state.getState();
}
} finally {
channelExecutor.drain();
} }
} }
@ -385,22 +396,26 @@ final class InternalSubchannel implements WithLogId {
public void transportReady() { public void transportReady() {
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is ready", log.log(Level.FINE, "[{0}] {1} for {2} is ready",
new Object[] {getLogId(), transport.getLogId(), address}); new Object[] {logId, transport.getLogId(), address});
} }
ConnectivityState savedState; ConnectivityState savedState;
synchronized (lock) { try {
savedState = state.getState(); synchronized (lock) {
reconnectPolicy = null; savedState = state.getState();
nextAddressIndex = 0; reconnectPolicy = null;
if (savedState == SHUTDOWN) { nextAddressIndex = 0;
// activeTransport should have already been set to null by shutdown(). We keep it null. if (savedState == SHUTDOWN) {
Preconditions.checkState(activeTransport == null, // activeTransport should have already been set to null by shutdown(). We keep it null.
"Unexpected non-null activeTransport"); Preconditions.checkState(activeTransport == null,
} else if (pendingTransport == transport) { "Unexpected non-null activeTransport");
gotoNonErrorState(READY); } else if (pendingTransport == transport) {
activeTransport = transport; gotoNonErrorState(READY);
pendingTransport = null; activeTransport = transport;
pendingTransport = null;
}
} }
} finally {
channelExecutor.drain();
} }
if (savedState == SHUTDOWN) { if (savedState == SHUTDOWN) {
transport.shutdown(); transport.shutdown();
@ -416,31 +431,31 @@ final class InternalSubchannel implements WithLogId {
public void transportShutdown(Status s) { public void transportShutdown(Status s) {
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}", log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
new Object[] {getLogId(), transport.getLogId(), address, s}); new Object[] {logId, transport.getLogId(), address, s});
} }
Runnable runnable = null; try {
synchronized (lock) { synchronized (lock) {
if (state.getState() == SHUTDOWN) { if (state.getState() == SHUTDOWN) {
return; return;
} }
if (activeTransport == transport) { if (activeTransport == transport) {
gotoNonErrorState(IDLE); gotoNonErrorState(IDLE);
activeTransport = null; activeTransport = null;
} else if (pendingTransport == transport) { } else if (pendingTransport == transport) {
Preconditions.checkState(state.getState() == CONNECTING, Preconditions.checkState(state.getState() == CONNECTING,
"Expected state is CONNECTING, actual state is %s", state.getState()); "Expected state is CONNECTING, actual state is %s", state.getState());
// Continue reconnect if there are still addresses to try. // Continue reconnect if there are still addresses to try.
if (nextAddressIndex == 0) { if (nextAddressIndex == 0) {
// Initiate backoff // Initiate backoff
// Transition to TRANSIENT_FAILURE // Transition to TRANSIENT_FAILURE
scheduleBackoff(s); scheduleBackoff(s);
} else { } else {
runnable = startNewTransport(); startNewTransport();
}
} }
} }
} } finally {
if (runnable != null) { channelExecutor.drain();
runnable.run();
} }
} }
@ -448,17 +463,21 @@ final class InternalSubchannel implements WithLogId {
public void transportTerminated() { public void transportTerminated() {
if (log.isLoggable(Level.FINE)) { if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is terminated", log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
new Object[] {getLogId(), transport.getLogId(), address}); new Object[] {logId, transport.getLogId(), address});
} }
handleTransportInUseState(transport, false); handleTransportInUseState(transport, false);
synchronized (lock) { try {
transports.remove(transport); synchronized (lock) {
if (state.getState() == SHUTDOWN && transports.isEmpty()) { transports.remove(transport);
if (log.isLoggable(Level.FINE)) { if (state.getState() == SHUTDOWN && transports.isEmpty()) {
log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", getLogId()); if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", logId);
}
handleTermination();
} }
handleTermination();
} }
} finally {
channelExecutor.drain();
} }
Preconditions.checkState(activeTransport != transport, Preconditions.checkState(activeTransport != transport,
"activeTransport still points to this transport. " "activeTransport still points to this transport. "

View File

@ -85,8 +85,7 @@ public class InternalSubchannelTest {
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();
// For channelExecutor // For channelExecutor
private final FakeClock fakeExecutor = new FakeClock(); private final FakeClock fakeExecutor = new FakeClock();
private final SerializingExecutor channelExecutor = private final ChannelExecutor channelExecutor = new ChannelExecutor();
new SerializingExecutor(fakeExecutor.getScheduledExecutorService());
@Mock private BackoffPolicy mockBackoffPolicy1; @Mock private BackoffPolicy mockBackoffPolicy1;
@Mock private BackoffPolicy mockBackoffPolicy2; @Mock private BackoffPolicy mockBackoffPolicy2;
@ -678,10 +677,7 @@ public class InternalSubchannelTest {
} }
private void assertExactCallbackInvokes(String ... expectedInvokes) { private void assertExactCallbackInvokes(String ... expectedInvokes) {
// Make sure all callbacks are to run from channelExecutor only. assertEquals(0, channelExecutor.numPendingTasks());
assertEquals(0, callbackInvokes.size());
while (fakeExecutor.runDueTasks() > 0) {}
assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); assertEquals(Arrays.asList(expectedInvokes), callbackInvokes);
callbackInvokes.clear(); callbackInvokes.clear();
} }