mirror of https://github.com/grpc/grpc-java.git
core: Only use scheduled executor for timer tasks
This removes an abuse of scheduled executor in ManagedChannelImpl. The executor was used to avoid deadlocking. Now we run the work on the same thread, but delay it until locks have been released. There is no need to fix ManagedChannelImpl2. Due to its different threading model it didn't have need to abuse the scheduledExecutor. Fixes #2444
This commit is contained in:
parent
06c40dccc3
commit
e9fa8c99ce
|
|
@ -33,6 +33,7 @@ package io.grpc.internal;
|
|||
|
||||
import java.util.HashSet;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
|
||||
/**
|
||||
|
|
@ -44,15 +45,18 @@ abstract class InUseStateAggregator<T> {
|
|||
private final HashSet<T> inUseObjects = new HashSet<T>();
|
||||
|
||||
/**
|
||||
* Update the in-use state of an object. Initially no object is in use.
|
||||
* Update the in-use state of an object. Initially no object is in use. When the return value is
|
||||
* non-{@code null}, the caller should execute the runnable after releasing locks.
|
||||
*/
|
||||
final void updateObjectInUse(T object, boolean inUse) {
|
||||
@CheckReturnValue
|
||||
final Runnable updateObjectInUse(T object, boolean inUse) {
|
||||
Runnable runnable = null;
|
||||
synchronized (getLock()) {
|
||||
int origSize = inUseObjects.size();
|
||||
if (inUse) {
|
||||
inUseObjects.add(object);
|
||||
if (origSize == 0) {
|
||||
handleInUse();
|
||||
runnable = handleInUse();
|
||||
}
|
||||
} else {
|
||||
boolean removed = inUseObjects.remove(object);
|
||||
|
|
@ -61,8 +65,10 @@ abstract class InUseStateAggregator<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
return runnable;
|
||||
}
|
||||
|
||||
@CheckReturnValue
|
||||
final boolean isInUse() {
|
||||
synchronized (getLock()) {
|
||||
return !inUseObjects.isEmpty();
|
||||
|
|
@ -73,12 +79,13 @@ abstract class InUseStateAggregator<T> {
|
|||
|
||||
/**
|
||||
* Called when the aggregated in-use state has changed to true, which means at least one object is
|
||||
* in use.
|
||||
* in use. When the return value is non-{@code null}, then the runnable will be executed by the
|
||||
* caller of {@link #updateObjectInUse} after releasing locks.
|
||||
*
|
||||
* <p>This method is called under the lock returned by {@link #getLock}.
|
||||
*/
|
||||
@GuardedBy("getLock()")
|
||||
abstract void handleInUse();
|
||||
abstract Runnable handleInUse();
|
||||
|
||||
/**
|
||||
* Called when the aggregated in-use state has changed to false, which means no object is in use.
|
||||
|
|
|
|||
|
|
@ -197,8 +197,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
|
||||
@Override
|
||||
@GuardedBy("lock")
|
||||
void handleInUse() {
|
||||
exitIdleMode();
|
||||
Runnable handleInUse() {
|
||||
return exitIdleMode();
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
|
|
@ -262,38 +262,52 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
/**
|
||||
* Make the channel exit idle mode, if it's in it. Return a LoadBalancer that can be used for
|
||||
* making new requests. Return null if the channel is shutdown.
|
||||
*
|
||||
* <p>May be called under the lock.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
LoadBalancer<ClientTransport> exitIdleMode() {
|
||||
LoadBalancer<ClientTransport> exitIdleModeAndGetLb() {
|
||||
Runnable runnable;
|
||||
LoadBalancer<ClientTransport> balancer;
|
||||
synchronized (lock) {
|
||||
runnable = exitIdleMode();
|
||||
balancer = loadBalancer;
|
||||
}
|
||||
if (runnable != null) {
|
||||
runnable.run();
|
||||
}
|
||||
return balancer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make the channel exit idle mode, if it's in it. If the returned runnable is non-{@code null},
|
||||
* then it should be executed by the caller after releasing {@code lock}.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
private Runnable exitIdleMode() {
|
||||
final LoadBalancer<ClientTransport> balancer;
|
||||
final NameResolver resolver;
|
||||
synchronized (lock) {
|
||||
if (shutdown) {
|
||||
return null;
|
||||
}
|
||||
if (inUseStateAggregator.isInUse()) {
|
||||
cancelIdleTimer();
|
||||
} else {
|
||||
// exitIdleMode() may be called outside of inUseStateAggregator, which may still in
|
||||
// "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if
|
||||
// the aggregator receives actual uses.
|
||||
rescheduleIdleTimer();
|
||||
}
|
||||
if (graceLoadBalancer != null) {
|
||||
// Exit grace period; timer already rescheduled above.
|
||||
loadBalancer = graceLoadBalancer;
|
||||
graceLoadBalancer = null;
|
||||
}
|
||||
if (loadBalancer != null) {
|
||||
return loadBalancer;
|
||||
}
|
||||
log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
|
||||
balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
|
||||
this.loadBalancer = balancer;
|
||||
resolver = this.nameResolver;
|
||||
if (shutdown) {
|
||||
return null;
|
||||
}
|
||||
if (inUseStateAggregator.isInUse()) {
|
||||
cancelIdleTimer();
|
||||
} else {
|
||||
// exitIdleMode() may be called outside of inUseStateAggregator, which may still in
|
||||
// "not-in-use" state. If it's the case, we start the timer which will be soon cancelled if
|
||||
// the aggregator receives actual uses.
|
||||
rescheduleIdleTimer();
|
||||
}
|
||||
if (graceLoadBalancer != null) {
|
||||
// Exit grace period; timer already rescheduled above.
|
||||
loadBalancer = graceLoadBalancer;
|
||||
graceLoadBalancer = null;
|
||||
}
|
||||
if (loadBalancer != null) {
|
||||
return null;
|
||||
}
|
||||
log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
|
||||
balancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
|
||||
this.loadBalancer = balancer;
|
||||
resolver = this.nameResolver;
|
||||
class NameResolverStartTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
@ -308,8 +322,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
}
|
||||
}
|
||||
|
||||
scheduledExecutor.execute(new NameResolverStartTask());
|
||||
return balancer;
|
||||
return new NameResolverStartTask();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
@ -366,7 +379,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
LoadBalancer<ClientTransport> balancer = loadBalancer;
|
||||
if (balancer == null) {
|
||||
// Current state is either idle or in grace period
|
||||
balancer = exitIdleMode();
|
||||
balancer = exitIdleModeAndGetLb();
|
||||
}
|
||||
if (balancer == null) {
|
||||
return SHUTDOWN_TRANSPORT;
|
||||
|
|
@ -709,13 +722,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onInUse(TransportSet ts) {
|
||||
inUseStateAggregator.updateObjectInUse(ts, true);
|
||||
public Runnable onInUse(TransportSet ts) {
|
||||
return inUseStateAggregator.updateObjectInUse(ts, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNotInUse(TransportSet ts) {
|
||||
inUseStateAggregator.updateObjectInUse(ts, false);
|
||||
Runnable r = inUseStateAggregator.updateObjectInUse(ts, false);
|
||||
assert r == null;
|
||||
}
|
||||
});
|
||||
if (log.isLoggable(Level.FINE)) {
|
||||
|
|
@ -806,13 +820,17 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
delayedTransports.remove(delayedTransport);
|
||||
maybeTerminateChannel();
|
||||
}
|
||||
inUseStateAggregator.updateObjectInUse(delayedTransport, false);
|
||||
Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, false);
|
||||
assert r == null;
|
||||
}
|
||||
|
||||
@Override public void transportReady() {}
|
||||
|
||||
@Override public void transportInUse(boolean inUse) {
|
||||
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
|
||||
Runnable r = inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
|
||||
if (r != null) {
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
});
|
||||
boolean savedShutdown;
|
||||
|
|
|
|||
|
|
@ -120,8 +120,8 @@ final class TransportSet extends ManagedChannel implements WithLogId {
|
|||
}
|
||||
|
||||
@Override
|
||||
void handleInUse() {
|
||||
callback.onInUse(TransportSet.this);
|
||||
Runnable handleInUse() {
|
||||
return callback.onInUse(TransportSet.this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -430,7 +430,10 @@ final class TransportSet extends ManagedChannel implements WithLogId {
|
|||
|
||||
@Override
|
||||
public void transportInUse(boolean inUse) {
|
||||
inUseStateAggregator.updateObjectInUse(transport, inUse);
|
||||
Runnable r = inUseStateAggregator.updateObjectInUse(transport, inUse);
|
||||
if (r != null) {
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -439,7 +442,8 @@ final class TransportSet extends ManagedChannel implements WithLogId {
|
|||
@Override
|
||||
public void transportTerminated() {
|
||||
boolean runCallback = false;
|
||||
inUseStateAggregator.updateObjectInUse(transport, false);
|
||||
Runnable r = inUseStateAggregator.updateObjectInUse(transport, false);
|
||||
assert r == null;
|
||||
synchronized (lock) {
|
||||
transports.remove(transport);
|
||||
if (shutdown && transports.isEmpty()) {
|
||||
|
|
@ -585,9 +589,13 @@ final class TransportSet extends ManagedChannel implements WithLogId {
|
|||
|
||||
/**
|
||||
* Called when the TransportSet's in-use state has changed to true, which means at least one
|
||||
* transport is in use. This method is called under a lock thus externally synchronized.
|
||||
* transport is in use. This method is called under a lock thus externally synchronized. If the
|
||||
* return value is non-{@code null}, the runnable will be executed after releasing the lock.
|
||||
*/
|
||||
public void onInUse(TransportSet ts) { }
|
||||
@CheckReturnValue
|
||||
public Runnable onInUse(TransportSet ts) {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the TransportSet's in-use state has changed to false, which means no transport is
|
||||
|
|
|
|||
|
|
@ -404,9 +404,7 @@ public class ManagedChannelImplIdlenessTest {
|
|||
}
|
||||
|
||||
private void forceExitIdleMode() {
|
||||
channel.exitIdleMode();
|
||||
// NameResolver is started in the scheduled executor
|
||||
timer.runDueTasks();
|
||||
channel.exitIdleModeAndGetLb();
|
||||
}
|
||||
|
||||
private ClientTransport channelTmGetTransportUnwrapped(EquivalentAddressGroup addressGroup) {
|
||||
|
|
|
|||
|
|
@ -167,9 +167,7 @@ public class ManagedChannelImplTest {
|
|||
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE,
|
||||
executor.getScheduledExecutorService(), userAgent, interceptors, statsCtxFactory);
|
||||
// Force-exit the initial idle-mode
|
||||
channel.exitIdleMode();
|
||||
// Will start NameResolver in the scheduled executor
|
||||
assertEquals(1, timer.runDueTasks());
|
||||
channel.exitIdleModeAndGetLb();
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ public class ManagedChannelImplTransportManagerTest {
|
|||
ArgumentCaptor<TransportManager<ClientTransport>> tmCaptor
|
||||
= ArgumentCaptor.forClass(null);
|
||||
// Force Channel to exit the initial idleness to get NameResolver and LoadBalancer created.
|
||||
channel.exitIdleMode();
|
||||
channel.exitIdleModeAndGetLb();
|
||||
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
|
||||
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
|
||||
tm = tmCaptor.getValue();
|
||||
|
|
|
|||
Loading…
Reference in New Issue