From 5cc71f1de95bb8b0902cbb9d3eebe31912dc7287 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 6 Mar 2019 11:49:42 -0800 Subject: [PATCH] netty, core: pass log-only channel logger into transport --- .../io/grpc/internal/ChannelLoggerImpl.java | 40 +-- .../java/io/grpc/internal/ChannelTracer.java | 4 +- .../io/grpc/internal/InternalSubchannel.java | 26 +- .../grpc/internal/InternalSubchannelTest.java | 273 +++++++++--------- .../grpc/internal/ManagedChannelImplTest.java | 19 +- .../io/grpc/netty/NettyChannelBuilder.java | 2 +- .../io/grpc/netty/NettyClientTransport.java | 5 +- .../grpc/netty/NettyClientTransportTest.java | 17 +- 8 files changed, 219 insertions(+), 167 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ChannelLoggerImpl.java b/core/src/main/java/io/grpc/internal/ChannelLoggerImpl.java index 23e3a674ff..e000d872e8 100644 --- a/core/src/main/java/io/grpc/internal/ChannelLoggerImpl.java +++ b/core/src/main/java/io/grpc/internal/ChannelLoggerImpl.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ChannelLogger; import io.grpc.InternalChannelz.ChannelTrace.Event; import io.grpc.InternalChannelz.ChannelTrace.Event.Severity; +import io.grpc.InternalLogId; import java.text.MessageFormat; import java.util.logging.Level; @@ -35,10 +36,7 @@ final class ChannelLoggerImpl extends ChannelLogger { @Override public void log(ChannelLogLevel level, String msg) { - Level javaLogLevel = toJavaLogLevel(level); - if (ChannelTracer.logger.isLoggable(javaLogLevel)) { - tracer.logOnly(javaLogLevel, msg); - } + logOnly(tracer.getLogId(), level, msg); if (isTraceable(level)) { trace(level, msg); } @@ -46,19 +44,27 @@ final class ChannelLoggerImpl extends ChannelLogger { @Override public void log(ChannelLogLevel level, String messageFormat, Object... args) { - Level javaLogLevel = toJavaLogLevel(level); String msg = null; - if (ChannelTracer.logger.isLoggable(javaLogLevel)) { - if (msg == null) { - msg = MessageFormat.format(messageFormat, args); - } - tracer.logOnly(javaLogLevel, msg); + Level javaLogLevel = toJavaLogLevel(level); + if (isTraceable(level) || ChannelTracer.logger.isLoggable(javaLogLevel)) { + msg = MessageFormat.format(messageFormat, args); } - if (isTraceable(level)) { - if (msg == null) { - msg = MessageFormat.format(messageFormat, args); - } - trace(level, msg); + log(level, msg); + } + + static void logOnly(InternalLogId logId, ChannelLogLevel level, String msg) { + Level javaLogLevel = toJavaLogLevel(level); + if (ChannelTracer.logger.isLoggable(javaLogLevel)) { + ChannelTracer.logOnly(logId, javaLogLevel, msg); + } + } + + static void logOnly( + InternalLogId logId, ChannelLogLevel level, String messageFormat, Object... args) { + Level javaLogLevel = toJavaLogLevel(level); + if (ChannelTracer.logger.isLoggable(javaLogLevel)) { + String msg = MessageFormat.format(messageFormat, args); + ChannelTracer.logOnly(logId, javaLogLevel, msg); } } @@ -77,7 +83,7 @@ final class ChannelLoggerImpl extends ChannelLogger { .build()); } - private Severity toTracerSeverity(ChannelLogLevel level) { + private static Severity toTracerSeverity(ChannelLogLevel level) { switch (level) { case ERROR: return Severity.CT_ERROR; @@ -88,7 +94,7 @@ final class ChannelLoggerImpl extends ChannelLogger { } } - private Level toJavaLogLevel(ChannelLogLevel level) { + private static Level toJavaLogLevel(ChannelLogLevel level) { switch (level) { case ERROR: return Level.FINE; diff --git a/core/src/main/java/io/grpc/internal/ChannelTracer.java b/core/src/main/java/io/grpc/internal/ChannelTracer.java index 38d37773aa..4e39c7f3ed 100644 --- a/core/src/main/java/io/grpc/internal/ChannelTracer.java +++ b/core/src/main/java/io/grpc/internal/ChannelTracer.java @@ -102,7 +102,7 @@ final class ChannelTracer { logLevel = Level.FINEST; } traceOnly(event); - logOnly(logLevel, event.description); + logOnly(logId, logLevel, event.description); } boolean isTraceEnabled() { @@ -119,7 +119,7 @@ final class ChannelTracer { } } - void logOnly(Level logLevel, String msg) { + static void logOnly(InternalLogId logId, Level logLevel, String msg) { if (logger.isLoggable(logLevel)) { logger.log(logLevel, "[" + logId + "] " + msg); } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 8fcda2d262..2d60d15791 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -78,7 +78,7 @@ final class InternalSubchannel implements InternalInstrumented { private final InternalChannelz channelz; private final CallTracer callsTracer; private final ChannelTracer channelTracer; - private final ChannelLogger channelLogger; + private final ChannelLoggerImpl channelLogger; // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. private final Object lock = new Object(); @@ -258,9 +258,13 @@ final class InternalSubchannel implements InternalInstrumented { .setEagAttributes(addressIndex.getCurrentEagAttributes()) .setUserAgent(userAgent) .setHttpConnectProxiedSocketAddress(proxiedAddr); + TransportLogger transportLogger = new TransportLogger(); + // In case the transport logs in the constructor, use the subchannel logId + transportLogger.logId = getLogId(); ConnectionClientTransport transport = new CallTracingTransport( - transportFactory.newClientTransport(address, options, channelLogger), callsTracer); + transportFactory.newClientTransport(address, options, transportLogger), callsTracer); + transportLogger.logId = transport.getLogId(); channelz.addClientSocket(transport); pendingTransport = transport; transports.add(transport); @@ -268,6 +272,7 @@ final class InternalSubchannel implements InternalInstrumented { if (runnable != null) { syncContext.executeLater(runnable); } + channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId); } /** @@ -505,7 +510,6 @@ final class InternalSubchannel implements InternalInstrumented { return logId; } - @Override public ListenableFuture getStats() { SettableFuture ret = SettableFuture.create(); @@ -797,4 +801,20 @@ final class InternalSubchannel implements InternalInstrumented { } return buffer.toString(); } + + @VisibleForTesting + static final class TransportLogger extends ChannelLogger { + // Changed just after construction to break a cyclic dependency. + InternalLogId logId; + + @Override + public void log(ChannelLogLevel level, String message) { + ChannelLoggerImpl.logOnly(logId, level, message); + } + + @Override + public void log(ChannelLogLevel level, String messageFormat, Object... args) { + ChannelLoggerImpl.logOnly(logId, level, messageFormat, args); + } + } } diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index 5e7c021493..beed0187c8 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -29,6 +29,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -48,6 +50,7 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.internal.InternalSubchannel.CallTracingTransport; import io.grpc.internal.InternalSubchannel.Index; +import io.grpc.internal.InternalSubchannel.TransportLogger; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.util.Arrays; @@ -168,9 +171,9 @@ public class InternalSubchannelTest { assertNull(internalSubchannel.obtainActiveTransport()); assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory).newClientTransport( - addr, - createClientTransportOptions().setEagAttributes(attr), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions().setEagAttributes(attr)), + isA(TransportLogger.class)); } @Test public void singleAddressReconnect() { @@ -192,9 +195,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. assertNoCallbackInvoke(); @@ -211,9 +214,9 @@ public class InternalSubchannelTest { assertNull(internalSubchannel.obtainActiveTransport()); verify(mockTransportFactory, times(transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertNoCallbackInvoke(); @@ -222,9 +225,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one too assertNoCallbackInvoke(); // Here we use a different status from the first failure, and verify that it's passed to @@ -242,9 +245,9 @@ public class InternalSubchannelTest { assertNull(internalSubchannel.obtainActiveTransport()); verify(mockTransportFactory, times(transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertNoCallbackInvoke(); fakeClock.forwardNanos(1); @@ -253,9 +256,9 @@ public class InternalSubchannelTest { assertNull(internalSubchannel.obtainActiveTransport()); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Let this one succeed, will enter READY state. assertNoCallbackInvoke(); transports.peek().listener.transportReady(); @@ -278,9 +281,9 @@ public class InternalSubchannelTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Final checks for consultations on back-off policies verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); @@ -307,9 +310,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Let this one fail without success transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -322,9 +325,9 @@ public class InternalSubchannelTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertNull(internalSubchannel.obtainActiveTransport()); // Fail this one too assertNoCallbackInvoke(); @@ -345,9 +348,9 @@ public class InternalSubchannelTest { fakeClock.forwardNanos(9); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertNoCallbackInvoke(); fakeClock.forwardNanos(1); @@ -355,9 +358,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one too transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -368,9 +371,9 @@ public class InternalSubchannelTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one too assertNoCallbackInvoke(); transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); @@ -386,9 +389,9 @@ public class InternalSubchannelTest { fakeClock.forwardNanos(99); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertNoCallbackInvoke(); fakeClock.forwardNanos(1); @@ -396,9 +399,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Let it through assertNoCallbackInvoke(); transports.peek().listener.transportReady(); @@ -422,9 +425,9 @@ public class InternalSubchannelTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail the transport transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -433,9 +436,9 @@ public class InternalSubchannelTest { verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one too assertEquals(CONNECTING, internalSubchannel.getState()); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -450,9 +453,9 @@ public class InternalSubchannelTest { fakeClock.forwardNanos(9); verify(mockTransportFactory, times(transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertNoCallbackInvoke(); fakeClock.forwardNanos(1); @@ -460,9 +463,9 @@ public class InternalSubchannelTest { assertEquals(CONNECTING, internalSubchannel.getState()); verify(mockTransportFactory, times(++transportsAddr1)) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Final checks on invocations on back-off policies verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); @@ -499,18 +502,18 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connects verify(mockTransportFactory) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.peek().listener.transportReady(); assertExactCallbackInvokes("onStateChange:READY"); assertEquals(READY, internalSubchannel.getState()); @@ -531,15 +534,15 @@ public class InternalSubchannelTest { assertEquals(0, fakeClock.numPendingTasks()); verify(mockTransportFactory, times(2)) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportFactory) .newClientTransport( - addr3, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr3), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verifyNoMoreInteractions(mockTransportFactory); @@ -558,18 +561,18 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connecting verify(mockTransportFactory) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -591,15 +594,15 @@ public class InternalSubchannelTest { assertEquals(0, fakeClock.numPendingTasks()); verify(mockTransportFactory, times(2)) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportFactory) .newClientTransport( - addr3, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr3), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verifyNoMoreInteractions(mockTransportFactory); @@ -616,14 +619,14 @@ public class InternalSubchannelTest { // Nothing happened on address update verify(mockTransportFactory, never()) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); verify(mockTransportFactory, never()) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); verifyNoMoreInteractions(mockTransportFactory); assertNoCallbackInvoke(); assertEquals(IDLE, internalSubchannel.getState()); @@ -633,9 +636,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // And no other addresses attempted assertEquals(0, fakeClock.numPendingTasks()); @@ -660,18 +663,18 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connects verify(mockTransportFactory) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.peek().listener.transportReady(); assertExactCallbackInvokes("onStateChange:READY"); assertEquals(READY, internalSubchannel.getState()); @@ -692,15 +695,15 @@ public class InternalSubchannelTest { assertEquals(0, fakeClock.numPendingTasks()); verify(mockTransportFactory) .newClientTransport( - addr3, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr3), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportFactory) .newClientTransport( - addr4, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr4), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verifyNoMoreInteractions(mockTransportFactory); @@ -720,18 +723,18 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr1, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr1), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertEquals(CONNECTING, internalSubchannel.getState()); // Second address connecting verify(mockTransportFactory) .newClientTransport( - addr2, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr2), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertNoCallbackInvoke(); assertEquals(CONNECTING, internalSubchannel.getState()); @@ -750,15 +753,15 @@ public class InternalSubchannelTest { assertEquals(0, fakeClock.numPendingTasks()); verify(mockTransportFactory) .newClientTransport( - addr3, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr3), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportFactory) .newClientTransport( - addr4, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr4), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); verifyNoMoreInteractions(mockTransportFactory); @@ -776,18 +779,18 @@ public class InternalSubchannelTest { // Won't connect until requested verify(mockTransportFactory, times(transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // First attempt internalSubchannel.obtainActiveTransport(); assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -798,9 +801,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Make this one proceed transports.peek().listener.transportReady(); @@ -818,9 +821,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory, times(++transportsCreated)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); } @Test @@ -852,9 +855,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); // Fail this one MockClientTransportInfo transportInfo = transports.poll(); @@ -1076,9 +1079,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:CONNECTING"); verify(mockTransportFactory) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); @@ -1097,9 +1100,9 @@ public class InternalSubchannelTest { verify(mockTransportFactory, times(2)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); assertExactCallbackInvokes("onStateChange:CONNECTING"); assertTrue(reconnectTask.isCancelled()); @@ -1108,9 +1111,9 @@ public class InternalSubchannelTest { assertNoCallbackInvoke(); verify(mockTransportFactory, times(2)) .newClientTransport( - addr, - createClientTransportOptions(), - internalSubchannel.getChannelLogger()); + eq(addr), + eq(createClientTransportOptions()), + isA(TransportLogger.class)); verify(mockBackoffPolicyProvider, times(1)).get(); // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 1d5579351a..e31d2e3735 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -98,6 +98,7 @@ import io.grpc.ServerMethodDefinition; import io.grpc.Status; import io.grpc.StringMarshaller; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; +import io.grpc.internal.InternalSubchannel.TransportLogger; import io.grpc.internal.TestUtils.MockClientTransportInfo; import io.grpc.stub.ClientCalls; import io.grpc.testing.TestMethodDescriptors; @@ -1256,16 +1257,24 @@ public class ManagedChannelImplTest { // requestConnection() verify(mockTransportFactory, never()) .newClientTransport( - any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); + any(SocketAddress.class), + any(ClientTransportOptions.class), + any(TransportLogger.class)); sub1.requestConnection(); verify(mockTransportFactory) - .newClientTransport(socketAddress, clientTransportOptions, sub1.getChannelLogger()); + .newClientTransport( + eq(socketAddress), + eq(clientTransportOptions), + isA(TransportLogger.class)); MockClientTransportInfo transportInfo1 = transports.poll(); assertNotNull(transportInfo1); sub2.requestConnection(); - verify(mockTransportFactory, times(1)) - .newClientTransport(socketAddress, clientTransportOptions, sub2.getChannelLogger()); + verify(mockTransportFactory, times(2)) + .newClientTransport( + eq(socketAddress), + eq(clientTransportOptions), + isA(TransportLogger.class)); MockClientTransportInfo transportInfo2 = transports.poll(); assertNotNull(transportInfo2); @@ -1274,7 +1283,7 @@ public class ManagedChannelImplTest { // The subchannel doesn't matter since this isn't called verify(mockTransportFactory, times(2)) .newClientTransport( - eq(socketAddress), eq(clientTransportOptions), isA(ChannelLogger.class)); + eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class)); // shutdown() has a delay sub1.shutdown(); diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 6cd0bb2fc7..4d1eb4515d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -581,7 +581,7 @@ public final class NettyChannelBuilder maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), tooManyPingsRunnable, transportTracer, options.getEagAttributes(), - localSocketPicker); + localSocketPicker, channelLogger); return transport; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 3594cecb9d..f6a87d3a8e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.ChannelLogger; import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalLogId; import io.grpc.Metadata; @@ -94,6 +95,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final TransportTracer transportTracer; private final Attributes eagAttributes; private final LocalSocketPicker localSocketPicker; + private final ChannelLogger channelLogger; NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, @@ -102,7 +104,7 @@ class NettyClientTransport implements ConnectionClientTransport { int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, - LocalSocketPicker localSocketPicker) { + LocalSocketPicker localSocketPicker, ChannelLogger channelLogger) { this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.negotiationScheme = this.negotiator.scheme(); this.remoteAddress = Preconditions.checkNotNull(address, "address"); @@ -124,6 +126,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker"); this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); + this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger"); } @Override diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 51b3c132ed..6b51256458 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -39,6 +39,7 @@ import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CallOptions; +import io.grpc.ChannelLogger; import io.grpc.Grpc; import io.grpc.InternalChannelz; import io.grpc.Metadata; @@ -187,7 +188,7 @@ public class NettyClientTransportTest { newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, - new SocketPicker()); + new SocketPicker(), new FakeChannelLogger()); transports.add(transport); callMeMaybe(transport.start(clientTransportListener)); @@ -428,7 +429,8 @@ public class NettyClientTransportTest { new HashMap, Object>(), group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, - null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); + null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), + new FakeChannelLogger()); transports.add(transport); // Should not throw @@ -644,7 +646,7 @@ public class NettyClientTransportTest { negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, - new TransportTracer(), eagAttributes, new SocketPicker()); + new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger()); transports.add(transport); return transport; } @@ -885,4 +887,13 @@ public class NettyClientTransportTest { return null; } } + + private static final class FakeChannelLogger extends ChannelLogger { + + @Override + public void log(ChannelLogLevel level, String message) {} + + @Override + public void log(ChannelLogLevel level, String messageFormat, Object... args) {} + } }