diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelOrphanWrapper.java b/core/src/main/java/io/grpc/internal/ManagedChannelOrphanWrapper.java index 8ae83de0cd..61f3fb9005 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelOrphanWrapper.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelOrphanWrapper.java @@ -24,7 +24,6 @@ import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; @@ -56,24 +55,17 @@ final class ManagedChannelOrphanWrapper extends ForwardingManagedChannel { @Override public ManagedChannel shutdown() { phantom.shutdown = true; + phantom.clear(); return super.shutdown(); } @Override public ManagedChannel shutdownNow() { - phantom.shutdownNow = true; + phantom.shutdown = true; + phantom.clear(); return super.shutdownNow(); } - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - boolean ret = super.awaitTermination(timeout, unit); - if (ret) { - phantom.clear(); - } - return ret; - } - @VisibleForTesting static final class ManagedChannelReference extends WeakReference { @@ -87,10 +79,9 @@ final class ManagedChannelOrphanWrapper extends ForwardingManagedChannel { private final ReferenceQueue refqueue; private final ConcurrentMap refs; - private final ManagedChannel channel; + private final String channelStr; private final Reference allocationSite; private volatile boolean shutdown; - private volatile boolean shutdownNow; ManagedChannelReference( ManagedChannelOrphanWrapper orphanable, @@ -102,7 +93,7 @@ final class ManagedChannelOrphanWrapper extends ForwardingManagedChannel { ENABLE_ALLOCATION_TRACKING ? new RuntimeException("ManagedChannel allocation site") : missingCallSite); - this.channel = channel; + this.channelStr = channel.toString(); this.refqueue = refqueue; this.refs = refs; this.refs.put(this, this); @@ -144,21 +135,18 @@ final class ManagedChannelOrphanWrapper extends ForwardingManagedChannel { while ((ref = (ManagedChannelReference) refqueue.poll()) != null) { RuntimeException maybeAllocationSite = ref.allocationSite.get(); ref.clearInternal(); // technically the reference is gone already. - if (!(ref.shutdown && ref.channel.isTerminated())) { + if (!ref.shutdown) { orphanedChannels++; - Level level = ref.shutdownNow ? Level.FINE : Level.SEVERE; + Level level = Level.SEVERE; if (logger.isLoggable(level)) { String fmt = - "*~*~*~ Channel {0} was not " - // Prefer to complain about shutdown if neither has been called. - + (!ref.shutdown ? "shutdown" : "terminated") - + " properly!!! ~*~*~*" - + System.getProperty("line.separator") - + " Make sure to call shutdown()/shutdownNow() and wait " - + "until awaitTermination() returns true."; + "*~*~*~ Channel {0} was not shutdown properly!!! ~*~*~*" + + System.getProperty("line.separator") + + " Make sure to call shutdown()/shutdownNow() and wait " + + "until awaitTermination() returns true."; LogRecord lr = new LogRecord(level, fmt); lr.setLoggerName(logger.getName()); - lr.setParameters(new Object[]{ref.channel.toString()}); + lr.setParameters(new Object[] {ref.channelStr}); lr.setThrown(maybeAllocationSite); logger.log(lr); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelOrphanWrapperTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelOrphanWrapperTest.java index 9f9dc276e7..da89c2778f 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelOrphanWrapperTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelOrphanWrapperTest.java @@ -18,20 +18,22 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; +import com.google.common.testing.GcFinalization; +import com.google.common.testing.GcFinalization.FinalizationPredicate; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.internal.ManagedChannelOrphanWrapper.ManagedChannelReference; import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Filter; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -43,10 +45,10 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public final class ManagedChannelOrphanWrapperTest { @Test - public void orphanedChannelsAreLogged() throws Exception { - ManagedChannel mc = mock(ManagedChannel.class); + public void orphanedChannelsAreLogged() { + ManagedChannel mc = new TestManagedChannel(); String channelString = mc.toString(); - ReferenceQueue refqueue = + final ReferenceQueue refqueue = new ReferenceQueue(); ConcurrentMap refs = new ConcurrentHashMap(); @@ -71,22 +73,18 @@ public final class ManagedChannelOrphanWrapperTest { } }); - // TODO(carl-mastrangelo): consider using com.google.common.testing.GcFinalization instead. try { channel = null; - boolean success = false; - for (int retry = 0; retry < 3; retry++) { - System.gc(); - System.runFinalization(); - int orphans = ManagedChannelReference.cleanQueue(refqueue); - if (orphans == 1) { - success = true; - break; - } - assertEquals("unexpected extra orphans", 0, orphans); - Thread.sleep(100L * (1L << retry)); - } - assertTrue("Channel was not garbage collected", success); + final AtomicInteger numOrphans = new AtomicInteger(); + GcFinalization.awaitDone( + new FinalizationPredicate() { + @Override + public boolean isDone() { + numOrphans.getAndAdd(ManagedChannelReference.cleanQueue(refqueue)); + return numOrphans.get() > 0; + } + }); + assertEquals("unexpected extra orphans", 1, numOrphans.get()); LogRecord lr; synchronized (records) { @@ -102,7 +100,32 @@ public final class ManagedChannelOrphanWrapperTest { } } - private static final class TestManagedChannel extends ManagedChannel { + @Test + public void refCycleIsGCed() { + ReferenceQueue refqueue = + new ReferenceQueue(); + ConcurrentMap refs = + new ConcurrentHashMap(); + ApplicationWithChannelRef app = new ApplicationWithChannelRef(); + ChannelWithApplicationRef channelImpl = new ChannelWithApplicationRef(); + ManagedChannelOrphanWrapper channel = + new ManagedChannelOrphanWrapper(channelImpl, refqueue, refs); + app.channel = channel; + channelImpl.application = app; + WeakReference appWeakRef = + new WeakReference(app); + + // Simulate the application and channel going out of scope. A ref cycle between app and + // channel remains, so ensure that our tracking of orphaned channels does not prevent this + // reference cycle from being GCed. + channel = null; + app = null; + channelImpl = null; + + GcFinalization.awaitClear(appWeakRef); + } + + private static class TestManagedChannel extends ManagedChannel { @Override public ManagedChannel shutdown() { return null; @@ -139,4 +162,12 @@ public final class ManagedChannelOrphanWrapperTest { return null; } } + + private static final class ApplicationWithChannelRef { + private ManagedChannel channel; + } + + private static final class ChannelWithApplicationRef extends TestManagedChannel { + private ApplicationWithChannelRef application; + } }