Remove Intrinsic locking in ChannelImpl.

This commit is contained in:
Carl Mastrangelo 2015-07-27 15:00:01 -07:00
parent e36a64e6a6
commit 4d2b3e3d06
1 changed files with 48 additions and 38 deletions

View File

@ -62,6 +62,7 @@ public final class ChannelImpl extends Channel {
private final ClientTransportFactory transportFactory; private final ClientTransportFactory transportFactory;
private final ExecutorService executor; private final ExecutorService executor;
private final String userAgent; private final String userAgent;
private final Object lock = new Object();
/** /**
* Executor that runs deadline timers for requests. * Executor that runs deadline timers for requests.
@ -77,16 +78,16 @@ public final class ChannelImpl extends Channel {
* present, but previously used transports that still have streams or are stopping may also be * present, but previously used transports that still have streams or are stopping may also be
* present. * present.
*/ */
@GuardedBy("this") @GuardedBy("lock")
private Collection<ClientTransport> transports = new ArrayList<ClientTransport>(); private Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
/** /**
* The transport for new outgoing requests. 'this' lock must be held when assigning to * The transport for new outgoing requests. 'this' lock must be held when assigning to
* activeTransport. * activeTransport.
*/ */
private volatile ClientTransport activeTransport; private volatile ClientTransport activeTransport;
@GuardedBy("this") @GuardedBy("lock")
private boolean shutdown; private boolean shutdown;
@GuardedBy("this") @GuardedBy("lock")
private boolean terminated; private boolean terminated;
private Runnable terminationRunnable; private Runnable terminationRunnable;
@ -116,7 +117,8 @@ public final class ChannelImpl extends Channel {
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
* cancelled. * cancelled.
*/ */
public synchronized ChannelImpl shutdown() { public ChannelImpl shutdown() {
synchronized (lock) {
if (shutdown) { if (shutdown) {
return this; return this;
} }
@ -129,13 +131,14 @@ public final class ChannelImpl extends Channel {
activeTransport = null; activeTransport = null;
} else if (transports.isEmpty()) { } else if (transports.isEmpty()) {
terminated = true; terminated = true;
notifyAll(); lock.notifyAll();
if (terminationRunnable != null) { if (terminationRunnable != null) {
terminationRunnable.run(); terminationRunnable.run();
} }
} }
return this; return this;
} }
}
/** /**
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
@ -145,10 +148,12 @@ public final class ChannelImpl extends Channel {
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown(). * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/ */
// TODO(ejona86): cancel preexisting calls. // TODO(ejona86): cancel preexisting calls.
public synchronized ChannelImpl shutdownNow() { public ChannelImpl shutdownNow() {
synchronized (lock) {
shutdown(); shutdown();
return this; return this;
} }
}
/** /**
* Returns whether the channel is shutdown. Shutdown channels immediately cancel any new calls, * Returns whether the channel is shutdown. Shutdown channels immediately cancel any new calls,
@ -157,24 +162,27 @@ public final class ChannelImpl extends Channel {
* @see #shutdown() * @see #shutdown()
* @see #isTerminated() * @see #isTerminated()
*/ */
public synchronized boolean isShutdown() { public boolean isShutdown() {
synchronized (lock) {
return shutdown; return shutdown;
} }
}
/** /**
* Waits for the channel to become terminated, giving up if the timeout is reached. * Waits for the channel to become terminated, giving up if the timeout is reached.
* *
* @return whether the channel is terminated, as would be done by {@link #isTerminated()}. * @return whether the channel is terminated, as would be done by {@link #isTerminated()}.
*/ */
public synchronized boolean awaitTerminated(long timeout, TimeUnit unit) public boolean awaitTerminated(long timeout, TimeUnit unit) throws InterruptedException {
throws InterruptedException { synchronized (lock) {
long timeoutNanos = unit.toNanos(timeout); long timeoutNanos = unit.toNanos(timeout);
long endTimeNanos = System.nanoTime() + timeoutNanos; long endTimeNanos = System.nanoTime() + timeoutNanos;
while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos); TimeUnit.NANOSECONDS.timedWait(lock, timeoutNanos);
} }
return terminated; return terminated;
} }
}
/** /**
* Returns whether the channel is terminated. Terminated channels have no running calls and * Returns whether the channel is terminated. Terminated channels have no running calls and
@ -182,9 +190,11 @@ public final class ChannelImpl extends Channel {
* *
* @see #isShutdown() * @see #isShutdown()
*/ */
public synchronized boolean isTerminated() { public boolean isTerminated() {
synchronized (lock) {
return terminated; return terminated;
} }
}
/** /**
* Pings the remote endpoint to verify that the transport is still active. When an acknowledgement * Pings the remote endpoint to verify that the transport is still active. When an acknowledgement
@ -224,7 +234,7 @@ public final class ChannelImpl extends Channel {
if (savedActiveTransport != null) { if (savedActiveTransport != null) {
return savedActiveTransport; return savedActiveTransport;
} }
synchronized (this) { synchronized (lock) {
if (shutdown) { if (shutdown) {
return null; return null;
} }
@ -277,7 +287,7 @@ public final class ChannelImpl extends Channel {
@Override @Override
public void transportShutdown() { public void transportShutdown() {
synchronized (ChannelImpl.this) { synchronized (lock) {
if (activeTransport == transport) { if (activeTransport == transport) {
activeTransport = null; activeTransport = null;
} }
@ -286,7 +296,7 @@ public final class ChannelImpl extends Channel {
@Override @Override
public void transportTerminated() { public void transportTerminated() {
synchronized (ChannelImpl.this) { synchronized (lock) {
if (activeTransport == transport) { if (activeTransport == transport) {
log.warning("transportTerminated called without previous transportShutdown"); log.warning("transportTerminated called without previous transportShutdown");
activeTransport = null; activeTransport = null;
@ -298,7 +308,7 @@ public final class ChannelImpl extends Channel {
log.warning("transportTerminated called after already terminated"); log.warning("transportTerminated called after already terminated");
} }
terminated = true; terminated = true;
ChannelImpl.this.notifyAll(); lock.notifyAll();
if (terminationRunnable != null) { if (terminationRunnable != null) {
terminationRunnable.run(); terminationRunnable.run();
} }