From 4d2b3e3d066392548d7ed8f35e4fa65d86c92d9c Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Mon, 27 Jul 2015 15:00:01 -0700 Subject: [PATCH] Remove Intrinsic locking in ChannelImpl. --- core/src/main/java/io/grpc/ChannelImpl.java | 86 ++++++++++++--------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index 8c29166d98..cfacf08a98 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -62,6 +62,7 @@ public final class ChannelImpl extends Channel { private final ClientTransportFactory transportFactory; private final ExecutorService executor; private final String userAgent; + private final Object lock = new Object(); /** * Executor that runs deadline timers for requests. @@ -77,16 +78,16 @@ public final class ChannelImpl extends Channel { * present, but previously used transports that still have streams or are stopping may also be * present. */ - @GuardedBy("this") + @GuardedBy("lock") private Collection transports = new ArrayList(); /** * The transport for new outgoing requests. 'this' lock must be held when assigning to * activeTransport. */ private volatile ClientTransport activeTransport; - @GuardedBy("this") + @GuardedBy("lock") private boolean shutdown; - @GuardedBy("this") + @GuardedBy("lock") private boolean terminated; private Runnable terminationRunnable; @@ -116,25 +117,27 @@ public final class ChannelImpl extends Channel { * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * cancelled. */ - public synchronized ChannelImpl shutdown() { - if (shutdown) { + public ChannelImpl shutdown() { + synchronized (lock) { + if (shutdown) { + return this; + } + shutdown = true; + // After shutdown there are no new calls, so no new cancellation tasks are needed + deadlineCancellationExecutor = + SharedResourceHolder.release(TIMER_SERVICE, deadlineCancellationExecutor); + if (activeTransport != null) { + activeTransport.shutdown(); + activeTransport = null; + } else if (transports.isEmpty()) { + terminated = true; + lock.notifyAll(); + if (terminationRunnable != null) { + terminationRunnable.run(); + } + } return this; } - shutdown = true; - // After shutdown there are no new calls, so no new cancellation tasks are needed - deadlineCancellationExecutor = - SharedResourceHolder.release(TIMER_SERVICE, deadlineCancellationExecutor); - if (activeTransport != null) { - activeTransport.shutdown(); - activeTransport = null; - } else if (transports.isEmpty()) { - terminated = true; - notifyAll(); - if (terminationRunnable != null) { - terminationRunnable.run(); - } - } - return this; } /** @@ -145,9 +148,11 @@ public final class ChannelImpl extends Channel { *

NOT YET IMPLEMENTED. This method currently behaves identically to shutdown(). */ // TODO(ejona86): cancel preexisting calls. - public synchronized ChannelImpl shutdownNow() { - shutdown(); - return this; + public ChannelImpl shutdownNow() { + synchronized (lock) { + shutdown(); + return this; + } } /** @@ -157,8 +162,10 @@ public final class ChannelImpl extends Channel { * @see #shutdown() * @see #isTerminated() */ - public synchronized boolean isShutdown() { - return shutdown; + public boolean isShutdown() { + synchronized (lock) { + return shutdown; + } } /** @@ -166,14 +173,15 @@ public final class ChannelImpl extends Channel { * * @return whether the channel is terminated, as would be done by {@link #isTerminated()}. */ - public synchronized boolean awaitTerminated(long timeout, TimeUnit unit) - throws InterruptedException { - long timeoutNanos = unit.toNanos(timeout); - long endTimeNanos = System.nanoTime() + timeoutNanos; - while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { - TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos); + public boolean awaitTerminated(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (lock) { + long timeoutNanos = unit.toNanos(timeout); + long endTimeNanos = System.nanoTime() + timeoutNanos; + while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { + TimeUnit.NANOSECONDS.timedWait(lock, timeoutNanos); + } + return terminated; } - return terminated; } /** @@ -182,8 +190,10 @@ public final class ChannelImpl extends Channel { * * @see #isShutdown() */ - public synchronized boolean isTerminated() { - return terminated; + public boolean isTerminated() { + synchronized (lock) { + return terminated; + } } /** @@ -224,7 +234,7 @@ public final class ChannelImpl extends Channel { if (savedActiveTransport != null) { return savedActiveTransport; } - synchronized (this) { + synchronized (lock) { if (shutdown) { return null; } @@ -277,7 +287,7 @@ public final class ChannelImpl extends Channel { @Override public void transportShutdown() { - synchronized (ChannelImpl.this) { + synchronized (lock) { if (activeTransport == transport) { activeTransport = null; } @@ -286,7 +296,7 @@ public final class ChannelImpl extends Channel { @Override public void transportTerminated() { - synchronized (ChannelImpl.this) { + synchronized (lock) { if (activeTransport == transport) { log.warning("transportTerminated called without previous transportShutdown"); activeTransport = null; @@ -298,7 +308,7 @@ public final class ChannelImpl extends Channel { log.warning("transportTerminated called after already terminated"); } terminated = true; - ChannelImpl.this.notifyAll(); + lock.notifyAll(); if (terminationRunnable != null) { terminationRunnable.run(); }