From e9fa8c99ceba92753998ee7c3b5fece44aa635b9 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 16 Nov 2016 00:01:23 -0800 Subject: [PATCH] core: Only use scheduled executor for timer tasks This removes an abuse of scheduled executor in ManagedChannelImpl. The executor was used to avoid deadlocking. Now we run the work on the same thread, but delay it until locks have been released. There is no need to fix ManagedChannelImpl2. Due to its different threading model it didn't have need to abuse the scheduledExecutor. Fixes #2444 --- .../grpc/internal/InUseStateAggregator.java | 17 +++- .../io/grpc/internal/ManagedChannelImpl.java | 92 +++++++++++-------- .../java/io/grpc/internal/TransportSet.java | 20 ++-- .../ManagedChannelImplIdlenessTest.java | 4 +- .../grpc/internal/ManagedChannelImplTest.java | 4 +- ...anagedChannelImplTransportManagerTest.java | 2 +- 6 files changed, 84 insertions(+), 55 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java index 3cd2104f39..45e3b26de3 100644 --- a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java +++ b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java @@ -33,6 +33,7 @@ package io.grpc.internal; import java.util.HashSet; +import javax.annotation.CheckReturnValue; import javax.annotation.concurrent.GuardedBy; /** @@ -44,15 +45,18 @@ abstract class InUseStateAggregator { private final HashSet inUseObjects = new HashSet(); /** - * Update the in-use state of an object. Initially no object is in use. + * Update the in-use state of an object. Initially no object is in use. When the return value is + * non-{@code null}, the caller should execute the runnable after releasing locks. */ - final void updateObjectInUse(T object, boolean inUse) { + @CheckReturnValue + final Runnable updateObjectInUse(T object, boolean inUse) { + Runnable runnable = null; synchronized (getLock()) { int origSize = inUseObjects.size(); if (inUse) { inUseObjects.add(object); if (origSize == 0) { - handleInUse(); + runnable = handleInUse(); } } else { boolean removed = inUseObjects.remove(object); @@ -61,8 +65,10 @@ abstract class InUseStateAggregator { } } } + return runnable; } + @CheckReturnValue final boolean isInUse() { synchronized (getLock()) { return !inUseObjects.isEmpty(); @@ -73,12 +79,13 @@ abstract class InUseStateAggregator { /** * Called when the aggregated in-use state has changed to true, which means at least one object is - * in use. + * in use. When the return value is non-{@code null}, then the runnable will be executed by the + * caller of {@link #updateObjectInUse} after releasing locks. * *

This method is called under the lock returned by {@link #getLock}. */ @GuardedBy("getLock()") - abstract void handleInUse(); + abstract Runnable handleInUse(); /** * Called when the aggregated in-use state has changed to false, which means no object is in use. diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index f2d40abf0b..b6edab8653 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -197,8 +197,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI @Override @GuardedBy("lock") - void handleInUse() { - exitIdleMode(); + Runnable handleInUse() { + return exitIdleMode(); } @GuardedBy("lock") @@ -262,38 +262,52 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI /** * Make the channel exit idle mode, if it's in it. Return a LoadBalancer that can be used for * making new requests. Return null if the channel is shutdown. - * - *

May be called under the lock. */ @VisibleForTesting - LoadBalancer exitIdleMode() { + LoadBalancer exitIdleModeAndGetLb() { + Runnable runnable; + LoadBalancer balancer; + synchronized (lock) { + runnable = exitIdleMode(); + balancer = loadBalancer; + } + if (runnable != null) { + runnable.run(); + } + return balancer; + } + + /** + * Make the channel exit idle mode, if it's in it. If the returned runnable is non-{@code null}, + * then it should be executed by the caller after releasing {@code lock}. + */ + @GuardedBy("lock") + private Runnable exitIdleMode() { final LoadBalancer balancer; final NameResolver resolver; - synchronized (lock) { - if (shutdown) { - return null; - } - if (inUseStateAggregator.isInUse()) { - cancelIdleTimer(); - } else { - // exitIdleMode() may be called outside of inUseStateAggregator, which may still in - // "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if - // the aggregator receives actual uses. - rescheduleIdleTimer(); - } - if (graceLoadBalancer != null) { - // Exit grace period; timer already rescheduled above. - loadBalancer = graceLoadBalancer; - graceLoadBalancer = null; - } - if (loadBalancer != null) { - return loadBalancer; - } - log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); - balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm); - this.loadBalancer = balancer; - resolver = this.nameResolver; + if (shutdown) { + return null; } + if (inUseStateAggregator.isInUse()) { + cancelIdleTimer(); + } else { + // exitIdleMode() may be called outside of inUseStateAggregator, which may still in + // "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if + // the aggregator receives actual uses. + rescheduleIdleTimer(); + } + if (graceLoadBalancer != null) { + // Exit grace period; timer already rescheduled above. + loadBalancer = graceLoadBalancer; + graceLoadBalancer = null; + } + if (loadBalancer != null) { + return null; + } + log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); + balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm); + this.loadBalancer = balancer; + resolver = this.nameResolver; class NameResolverStartTask implements Runnable { @Override public void run() { @@ -308,8 +322,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI } } - scheduledExecutor.execute(new NameResolverStartTask()); - return balancer; + return new NameResolverStartTask(); } @VisibleForTesting @@ -366,7 +379,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI LoadBalancer balancer = loadBalancer; if (balancer == null) { // Current state is either idle or in grace period - balancer = exitIdleMode(); + balancer = exitIdleModeAndGetLb(); } if (balancer == null) { return SHUTDOWN_TRANSPORT; @@ -709,13 +722,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI } @Override - public void onInUse(TransportSet ts) { - inUseStateAggregator.updateObjectInUse(ts, true); + public Runnable onInUse(TransportSet ts) { + return inUseStateAggregator.updateObjectInUse(ts, true); } @Override public void onNotInUse(TransportSet ts) { - inUseStateAggregator.updateObjectInUse(ts, false); + Runnable r = inUseStateAggregator.updateObjectInUse(ts, false); + assert r == null; } }); if (log.isLoggable(Level.FINE)) { @@ -806,13 +820,17 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI delayedTransports.remove(delayedTransport); maybeTerminateChannel(); } - inUseStateAggregator.updateObjectInUse(delayedTransport, false); + Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, false); + assert r == null; } @Override public void transportReady() {} @Override public void transportInUse(boolean inUse) { - inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + if (r != null) { + r.run(); + } } }); boolean savedShutdown; diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 501670e745..07c1e68b39 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -120,8 +120,8 @@ final class TransportSet extends ManagedChannel implements WithLogId { } @Override - void handleInUse() { - callback.onInUse(TransportSet.this); + Runnable handleInUse() { + return callback.onInUse(TransportSet.this); } @Override @@ -430,7 +430,10 @@ final class TransportSet extends ManagedChannel implements WithLogId { @Override public void transportInUse(boolean inUse) { - inUseStateAggregator.updateObjectInUse(transport, inUse); + Runnable r = inUseStateAggregator.updateObjectInUse(transport, inUse); + if (r != null) { + r.run(); + } } @Override @@ -439,7 +442,8 @@ final class TransportSet extends ManagedChannel implements WithLogId { @Override public void transportTerminated() { boolean runCallback = false; - inUseStateAggregator.updateObjectInUse(transport, false); + Runnable r = inUseStateAggregator.updateObjectInUse(transport, false); + assert r == null; synchronized (lock) { transports.remove(transport); if (shutdown && transports.isEmpty()) { @@ -585,9 +589,13 @@ final class TransportSet extends ManagedChannel implements WithLogId { /** * Called when the TransportSet's in-use state has changed to true, which means at least one - * transport is in use. This method is called under a lock thus externally synchronized. + * transport is in use. This method is called under a lock thus externally synchronized. If the + * return value is non-{@code null}, the runnable will be executed after releasing the lock. */ - public void onInUse(TransportSet ts) { } + @CheckReturnValue + public Runnable onInUse(TransportSet ts) { + return null; + } /** * Called when the TransportSet's in-use state has changed to false, which means no transport is diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index 83249cdc6b..7e368a2f71 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -404,9 +404,7 @@ public class ManagedChannelImplIdlenessTest { } private void forceExitIdleMode() { - channel.exitIdleMode(); - // NameResolver is started in the scheduled executor - timer.runDueTasks(); + channel.exitIdleModeAndGetLb(); } private ClientTransport channelTmGetTransportUnwrapped(EquivalentAddressGroup addressGroup) { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index c862cdc2a9..d47ccb51ab 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -167,9 +167,7 @@ public class ManagedChannelImplTest { ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE, executor.getScheduledExecutorService(), userAgent, interceptors, statsCtxFactory); // Force-exit the initial idle-mode - channel.exitIdleMode(); - // Will start NameResolver in the scheduled executor - assertEquals(1, timer.runDueTasks()); + channel.exitIdleModeAndGetLb(); } @Before diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index 14f3c35874..beee4b781f 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -147,7 +147,7 @@ public class ManagedChannelImplTransportManagerTest { ArgumentCaptor> tmCaptor = ArgumentCaptor.forClass(null); // Force Channel to exit the initial idleness to get NameResolver and LoadBalancer created. - channel.exitIdleMode(); + channel.exitIdleModeAndGetLb(); verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class)); verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture()); tm = tmCaptor.getValue();