diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index edb9fc007e..5590ee9f2e 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -51,14 +51,12 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -72,8 +70,6 @@ import javax.annotation.concurrent.ThreadSafe; final class InternalSubchannel implements WithLogId { private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); - - private final Object lock = new Object(); private final LogId logId = LogId.allocate(getClass().getName()); private final EquivalentAddressGroup addressGroup; private final String authority; @@ -83,18 +79,18 @@ final class InternalSubchannel implements WithLogId { private final ClientTransportFactory transportFactory; private final ScheduledExecutorService scheduledExecutor; - // A serializing executor shared across the Channel + // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. + private final Object lock = new Object(); + + // File-specific convention: // - // TODO(zhangkun83): decide the type of Channel Executor. I considered a SerializingExecutor - // based on the app executor, but it seems abusive because the app executor is intended for app - // logic, not for channel bookkeeping. We don't want channel bookkeeping logic to contend for - // threads with app logic, which may increase latency or even cause starvation. Instead, we - // should consider a thread-less Executor after the refactor of ManagedChannelImpl is done. + // 1. In a method without GuardedBy("lock"), executeLater() MUST be followed by a drain() later in + // the same method. // - // NOTE: there are cases where channelExecutor.execute() is run under "lock". This will add risk - // of deadlock if channelExecutor is based on a direct executor. Thread-less executor wouldn't - // have such problem. - private final Executor channelExecutor; + // 2. drain() MUST NOT be called under "lock". + // + // 3. Every synchronized("lock") must be inside a try-finally which calls drain() in "finally". + private final ChannelExecutor channelExecutor; @GuardedBy("lock") private int nextAddressIndex; @@ -158,7 +154,7 @@ final class InternalSubchannel implements WithLogId { InternalSubchannel(EquivalentAddressGroup addressGroup, String authority, String userAgent, BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, - Supplier stopwatchSupplier, Executor channelExecutor, Callback callback) { + Supplier stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback) { this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); this.authority = authority; this.userAgent = userAgent; @@ -173,35 +169,34 @@ final class InternalSubchannel implements WithLogId { /** * Returns a READY transport that will be used to create new streams. * - *

Returns {@code null} if the state is not READY. + *

Returns {@code null} if the state is not READY. Will try to connect if state is IDLE. */ @Nullable - final ClientTransport obtainActiveTransport() { + ClientTransport obtainActiveTransport() { ClientTransport savedTransport = activeTransport; if (savedTransport != null) { return savedTransport; } - Runnable runnable = null; - synchronized (lock) { - savedTransport = activeTransport; - // Check again, since it could have changed before acquiring the lock - if (savedTransport != null) { - return savedTransport; + try { + synchronized (lock) { + savedTransport = activeTransport; + // Check again, since it could have changed before acquiring the lock + if (savedTransport != null) { + return savedTransport; + } + if (state.getState() == IDLE) { + gotoNonErrorState(CONNECTING); + startNewTransport(); + } } - if (state.getState() == IDLE) { - gotoNonErrorState(CONNECTING); - runnable = startNewTransport(); - } - } - if (runnable != null) { - runnable.run(); + } finally { + channelExecutor.drain(); } return null; } - @CheckReturnValue @GuardedBy("lock") - private Runnable startNewTransport() { + private void startNewTransport() { Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); if (nextAddressIndex == 0) { @@ -217,11 +212,14 @@ final class InternalSubchannel implements WithLogId { transportFactory.newClientTransport(address, authority, userAgent); if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] Created {1} for {2}", - new Object[] {getLogId(), transport.getLogId(), address}); + new Object[] {logId, transport.getLogId(), address}); } pendingTransport = transport; transports.add(transport); - return transport.start(new TransportListener(transport, address)); + Runnable runnable = transport.start(new TransportListener(transport, address)); + if (runnable != null) { + channelExecutor.executeLater(runnable); + } } /** @@ -235,7 +233,6 @@ final class InternalSubchannel implements WithLogId { @Override public void run() { try { - Runnable runnable = null; synchronized (lock) { reconnectTask = null; if (state.getState() == SHUTDOWN) { @@ -244,13 +241,12 @@ final class InternalSubchannel implements WithLogId { return; } gotoNonErrorState(CONNECTING); - runnable = startNewTransport(); - } - if (runnable != null) { - runnable.run(); + startNewTransport(); } } catch (Throwable t) { log.log(Level.WARNING, "Exception handling end of backoff", t); + } finally { + channelExecutor.drain(); } } } @@ -262,8 +258,7 @@ final class InternalSubchannel implements WithLogId { long delayMillis = reconnectPolicy.nextBackoffMillis() - connectingTimer.elapsed(TimeUnit.MILLISECONDS); if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", - new Object[]{getLogId(), delayMillis}); + log.log(Level.FINE, "[{0}] Scheduling backoff for {1} ms", new Object[]{logId, delayMillis}); } Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done"); reconnectTask = scheduledExecutor.schedule( @@ -283,7 +278,7 @@ final class InternalSubchannel implements WithLogId { Preconditions.checkState(state.getState() != SHUTDOWN, "Cannot transition out of SHUTDOWN to " + newState); state = newState; - channelExecutor.execute(new Runnable() { + channelExecutor.executeLater(new Runnable() { @Override public void run() { callback.onStateChange(InternalSubchannel.this, newState); @@ -295,22 +290,26 @@ final class InternalSubchannel implements WithLogId { public void shutdown() { ManagedClientTransport savedActiveTransport; ConnectionClientTransport savedPendingTransport; - synchronized (lock) { - if (state.getState() == SHUTDOWN) { - return; - } - gotoNonErrorState(SHUTDOWN); - savedActiveTransport = activeTransport; - savedPendingTransport = pendingTransport; - activeTransport = null; - pendingTransport = null; - if (transports.isEmpty()) { - handleTermination(); - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "[{0}] Terminated in shutdown()", getLogId()); + try { + synchronized (lock) { + if (state.getState() == SHUTDOWN) { + return; } - } // else: the callback will be run once all transports have been terminated - cancelReconnectTask(); + gotoNonErrorState(SHUTDOWN); + savedActiveTransport = activeTransport; + savedPendingTransport = pendingTransport; + activeTransport = null; + pendingTransport = null; + if (transports.isEmpty()) { + handleTermination(); + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Terminated in shutdown()", logId); + } + } // else: the callback will be run once all transports have been terminated + cancelReconnectTask(); + } + } finally { + channelExecutor.drain(); } if (savedActiveTransport != null) { savedActiveTransport.shutdown(); @@ -320,9 +319,9 @@ final class InternalSubchannel implements WithLogId { } } - // May be called under lock. + @GuardedBy("lock") private void handleTermination() { - channelExecutor.execute(new Runnable() { + channelExecutor.executeLater(new Runnable() { @Override public void run() { callback.onTerminated(InternalSubchannel.this); @@ -332,25 +331,33 @@ final class InternalSubchannel implements WithLogId { private void handleTransportInUseState( final ConnectionClientTransport transport, final boolean inUse) { - channelExecutor.execute(new Runnable() { + channelExecutor.executeLater(new Runnable() { @Override public void run() { inUseStateAggregator.updateObjectInUse(transport, inUse); } - }); + }).drain(); } void shutdownNow(Status reason) { shutdown(); Collection transportsCopy; - synchronized (lock) { - transportsCopy = new ArrayList(transports); + try { + synchronized (lock) { + transportsCopy = new ArrayList(transports); + } + } finally { + channelExecutor.drain(); } for (ManagedClientTransport transport : transportsCopy) { transport.shutdownNow(reason); } } + EquivalentAddressGroup getAddressGroup() { + return addressGroup; + } + @GuardedBy("lock") private void cancelReconnectTask() { if (reconnectTask != null) { @@ -366,8 +373,12 @@ final class InternalSubchannel implements WithLogId { @VisibleForTesting ConnectivityState getState() { - synchronized (lock) { - return state.getState(); + try { + synchronized (lock) { + return state.getState(); + } + } finally { + channelExecutor.drain(); } } @@ -385,22 +396,26 @@ final class InternalSubchannel implements WithLogId { public void transportReady() { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] {1} for {2} is ready", - new Object[] {getLogId(), transport.getLogId(), address}); + new Object[] {logId, transport.getLogId(), address}); } ConnectivityState savedState; - synchronized (lock) { - savedState = state.getState(); - reconnectPolicy = null; - nextAddressIndex = 0; - if (savedState == SHUTDOWN) { - // activeTransport should have already been set to null by shutdown(). We keep it null. - Preconditions.checkState(activeTransport == null, - "Unexpected non-null activeTransport"); - } else if (pendingTransport == transport) { - gotoNonErrorState(READY); - activeTransport = transport; - pendingTransport = null; + try { + synchronized (lock) { + savedState = state.getState(); + reconnectPolicy = null; + nextAddressIndex = 0; + if (savedState == SHUTDOWN) { + // activeTransport should have already been set to null by shutdown(). We keep it null. + Preconditions.checkState(activeTransport == null, + "Unexpected non-null activeTransport"); + } else if (pendingTransport == transport) { + gotoNonErrorState(READY); + activeTransport = transport; + pendingTransport = null; + } } + } finally { + channelExecutor.drain(); } if (savedState == SHUTDOWN) { transport.shutdown(); @@ -416,31 +431,31 @@ final class InternalSubchannel implements WithLogId { public void transportShutdown(Status s) { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}", - new Object[] {getLogId(), transport.getLogId(), address, s}); + new Object[] {logId, transport.getLogId(), address, s}); } - Runnable runnable = null; - synchronized (lock) { - if (state.getState() == SHUTDOWN) { - return; - } - if (activeTransport == transport) { - gotoNonErrorState(IDLE); - activeTransport = null; - } else if (pendingTransport == transport) { - Preconditions.checkState(state.getState() == CONNECTING, - "Expected state is CONNECTING, actual state is %s", state.getState()); - // Continue reconnect if there are still addresses to try. - if (nextAddressIndex == 0) { - // Initiate backoff - // Transition to TRANSIENT_FAILURE - scheduleBackoff(s); - } else { - runnable = startNewTransport(); + try { + synchronized (lock) { + if (state.getState() == SHUTDOWN) { + return; + } + if (activeTransport == transport) { + gotoNonErrorState(IDLE); + activeTransport = null; + } else if (pendingTransport == transport) { + Preconditions.checkState(state.getState() == CONNECTING, + "Expected state is CONNECTING, actual state is %s", state.getState()); + // Continue reconnect if there are still addresses to try. + if (nextAddressIndex == 0) { + // Initiate backoff + // Transition to TRANSIENT_FAILURE + scheduleBackoff(s); + } else { + startNewTransport(); + } } } - } - if (runnable != null) { - runnable.run(); + } finally { + channelExecutor.drain(); } } @@ -448,17 +463,21 @@ final class InternalSubchannel implements WithLogId { public void transportTerminated() { if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] {1} for {2} is terminated", - new Object[] {getLogId(), transport.getLogId(), address}); + new Object[] {logId, transport.getLogId(), address}); } handleTransportInUseState(transport, false); - synchronized (lock) { - transports.remove(transport); - if (state.getState() == SHUTDOWN && transports.isEmpty()) { - if (log.isLoggable(Level.FINE)) { - log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", getLogId()); + try { + synchronized (lock) { + transports.remove(transport); + if (state.getState() == SHUTDOWN && transports.isEmpty()) { + if (log.isLoggable(Level.FINE)) { + log.log(Level.FINE, "[{0}] Terminated in transportTerminated()", logId); + } + handleTermination(); } - handleTermination(); } + } finally { + channelExecutor.drain(); } Preconditions.checkState(activeTransport != transport, "activeTransport still points to this transport. " diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 1032949431..1dc7ec95a9 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -85,8 +85,7 @@ public class InternalSubchannelTest { private final FakeClock fakeClock = new FakeClock(); // For channelExecutor private final FakeClock fakeExecutor = new FakeClock(); - private final SerializingExecutor channelExecutor = - new SerializingExecutor(fakeExecutor.getScheduledExecutorService()); + private final ChannelExecutor channelExecutor = new ChannelExecutor(); @Mock private BackoffPolicy mockBackoffPolicy1; @Mock private BackoffPolicy mockBackoffPolicy2; @@ -678,10 +677,7 @@ public class InternalSubchannelTest { } private void assertExactCallbackInvokes(String ... expectedInvokes) { - // Make sure all callbacks are to run from channelExecutor only. - assertEquals(0, callbackInvokes.size()); - - while (fakeExecutor.runDueTasks() > 0) {} + assertEquals(0, channelExecutor.numPendingTasks()); assertEquals(Arrays.asList(expectedInvokes), callbackInvokes); callbackInvokes.clear(); }