netty, core: pass log-only channel logger into transport

This commit is contained in:
Carl Mastrangelo 2019-03-06 11:49:42 -08:00 committed by GitHub
parent 07d7b99e31
commit 5cc71f1de9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 167 deletions

View File

@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ChannelLogger; import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz.ChannelTrace.Event; import io.grpc.InternalChannelz.ChannelTrace.Event;
import io.grpc.InternalChannelz.ChannelTrace.Event.Severity; import io.grpc.InternalChannelz.ChannelTrace.Event.Severity;
import io.grpc.InternalLogId;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.logging.Level; import java.util.logging.Level;
@ -35,10 +36,7 @@ final class ChannelLoggerImpl extends ChannelLogger {
@Override @Override
public void log(ChannelLogLevel level, String msg) { public void log(ChannelLogLevel level, String msg) {
Level javaLogLevel = toJavaLogLevel(level); logOnly(tracer.getLogId(), level, msg);
if (ChannelTracer.logger.isLoggable(javaLogLevel)) {
tracer.logOnly(javaLogLevel, msg);
}
if (isTraceable(level)) { if (isTraceable(level)) {
trace(level, msg); trace(level, msg);
} }
@ -46,19 +44,27 @@ final class ChannelLoggerImpl extends ChannelLogger {
@Override @Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) { public void log(ChannelLogLevel level, String messageFormat, Object... args) {
Level javaLogLevel = toJavaLogLevel(level);
String msg = null; String msg = null;
if (ChannelTracer.logger.isLoggable(javaLogLevel)) { Level javaLogLevel = toJavaLogLevel(level);
if (msg == null) { if (isTraceable(level) || ChannelTracer.logger.isLoggable(javaLogLevel)) {
msg = MessageFormat.format(messageFormat, args); msg = MessageFormat.format(messageFormat, args);
}
tracer.logOnly(javaLogLevel, msg);
} }
if (isTraceable(level)) { log(level, msg);
if (msg == null) { }
msg = MessageFormat.format(messageFormat, args);
} static void logOnly(InternalLogId logId, ChannelLogLevel level, String msg) {
trace(level, msg); Level javaLogLevel = toJavaLogLevel(level);
if (ChannelTracer.logger.isLoggable(javaLogLevel)) {
ChannelTracer.logOnly(logId, javaLogLevel, msg);
}
}
static void logOnly(
InternalLogId logId, ChannelLogLevel level, String messageFormat, Object... args) {
Level javaLogLevel = toJavaLogLevel(level);
if (ChannelTracer.logger.isLoggable(javaLogLevel)) {
String msg = MessageFormat.format(messageFormat, args);
ChannelTracer.logOnly(logId, javaLogLevel, msg);
} }
} }
@ -77,7 +83,7 @@ final class ChannelLoggerImpl extends ChannelLogger {
.build()); .build());
} }
private Severity toTracerSeverity(ChannelLogLevel level) { private static Severity toTracerSeverity(ChannelLogLevel level) {
switch (level) { switch (level) {
case ERROR: case ERROR:
return Severity.CT_ERROR; return Severity.CT_ERROR;
@ -88,7 +94,7 @@ final class ChannelLoggerImpl extends ChannelLogger {
} }
} }
private Level toJavaLogLevel(ChannelLogLevel level) { private static Level toJavaLogLevel(ChannelLogLevel level) {
switch (level) { switch (level) {
case ERROR: case ERROR:
return Level.FINE; return Level.FINE;

View File

@ -102,7 +102,7 @@ final class ChannelTracer {
logLevel = Level.FINEST; logLevel = Level.FINEST;
} }
traceOnly(event); traceOnly(event);
logOnly(logLevel, event.description); logOnly(logId, logLevel, event.description);
} }
boolean isTraceEnabled() { boolean isTraceEnabled() {
@ -119,7 +119,7 @@ final class ChannelTracer {
} }
} }
void logOnly(Level logLevel, String msg) { static void logOnly(InternalLogId logId, Level logLevel, String msg) {
if (logger.isLoggable(logLevel)) { if (logger.isLoggable(logLevel)) {
logger.log(logLevel, "[" + logId + "] " + msg); logger.log(logLevel, "[" + logId + "] " + msg);
} }

View File

@ -78,7 +78,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
private final InternalChannelz channelz; private final InternalChannelz channelz;
private final CallTracer callsTracer; private final CallTracer callsTracer;
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger; private final ChannelLoggerImpl channelLogger;
// File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock.
private final Object lock = new Object(); private final Object lock = new Object();
@ -258,9 +258,13 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
.setEagAttributes(addressIndex.getCurrentEagAttributes()) .setEagAttributes(addressIndex.getCurrentEagAttributes())
.setUserAgent(userAgent) .setUserAgent(userAgent)
.setHttpConnectProxiedSocketAddress(proxiedAddr); .setHttpConnectProxiedSocketAddress(proxiedAddr);
TransportLogger transportLogger = new TransportLogger();
// In case the transport logs in the constructor, use the subchannel logId
transportLogger.logId = getLogId();
ConnectionClientTransport transport = ConnectionClientTransport transport =
new CallTracingTransport( new CallTracingTransport(
transportFactory.newClientTransport(address, options, channelLogger), callsTracer); transportFactory.newClientTransport(address, options, transportLogger), callsTracer);
transportLogger.logId = transport.getLogId();
channelz.addClientSocket(transport); channelz.addClientSocket(transport);
pendingTransport = transport; pendingTransport = transport;
transports.add(transport); transports.add(transport);
@ -268,6 +272,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
if (runnable != null) { if (runnable != null) {
syncContext.executeLater(runnable); syncContext.executeLater(runnable);
} }
channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
} }
/** /**
@ -505,7 +510,6 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
return logId; return logId;
} }
@Override @Override
public ListenableFuture<ChannelStats> getStats() { public ListenableFuture<ChannelStats> getStats() {
SettableFuture<ChannelStats> ret = SettableFuture.create(); SettableFuture<ChannelStats> ret = SettableFuture.create();
@ -797,4 +801,20 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats> {
} }
return buffer.toString(); return buffer.toString();
} }
@VisibleForTesting
static final class TransportLogger extends ChannelLogger {
// Changed just after construction to break a cyclic dependency.
InternalLogId logId;
@Override
public void log(ChannelLogLevel level, String message) {
ChannelLoggerImpl.logOnly(logId, level, message);
}
@Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) {
ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
}
}
} }

View File

@ -29,6 +29,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -48,6 +50,7 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.InternalSubchannel.CallTracingTransport; import io.grpc.internal.InternalSubchannel.CallTracingTransport;
import io.grpc.internal.InternalSubchannel.Index; import io.grpc.internal.InternalSubchannel.Index;
import io.grpc.internal.InternalSubchannel.TransportLogger;
import io.grpc.internal.TestUtils.MockClientTransportInfo; import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Arrays; import java.util.Arrays;
@ -168,9 +171,9 @@ public class InternalSubchannelTest {
assertNull(internalSubchannel.obtainActiveTransport()); assertNull(internalSubchannel.obtainActiveTransport());
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory).newClientTransport( verify(mockTransportFactory).newClientTransport(
addr, eq(addr),
createClientTransportOptions().setEagAttributes(attr), eq(createClientTransportOptions().setEagAttributes(attr)),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
} }
@Test public void singleAddressReconnect() { @Test public void singleAddressReconnect() {
@ -192,9 +195,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE. // Fail this one. Because there is only one address to try, enter TRANSIENT_FAILURE.
assertNoCallbackInvoke(); assertNoCallbackInvoke();
@ -211,9 +214,9 @@ public class InternalSubchannelTest {
assertNull(internalSubchannel.obtainActiveTransport()); assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(transportsCreated)) verify(mockTransportFactory, times(transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke(); assertNoCallbackInvoke();
@ -222,9 +225,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one too // Fail this one too
assertNoCallbackInvoke(); assertNoCallbackInvoke();
// Here we use a different status from the first failure, and verify that it's passed to // Here we use a different status from the first failure, and verify that it's passed to
@ -242,9 +245,9 @@ public class InternalSubchannelTest {
assertNull(internalSubchannel.obtainActiveTransport()); assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(transportsCreated)) verify(mockTransportFactory, times(transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke(); assertNoCallbackInvoke();
fakeClock.forwardNanos(1); fakeClock.forwardNanos(1);
@ -253,9 +256,9 @@ public class InternalSubchannelTest {
assertNull(internalSubchannel.obtainActiveTransport()); assertNull(internalSubchannel.obtainActiveTransport());
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Let this one succeed, will enter READY state. // Let this one succeed, will enter READY state.
assertNoCallbackInvoke(); assertNoCallbackInvoke();
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
@ -278,9 +281,9 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Final checks for consultations on back-off policies // Final checks for consultations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
@ -307,9 +310,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1)) verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Let this one fail without success // Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -322,9 +325,9 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)) verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertNull(internalSubchannel.obtainActiveTransport()); assertNull(internalSubchannel.obtainActiveTransport());
// Fail this one too // Fail this one too
assertNoCallbackInvoke(); assertNoCallbackInvoke();
@ -345,9 +348,9 @@ public class InternalSubchannelTest {
fakeClock.forwardNanos(9); fakeClock.forwardNanos(9);
verify(mockTransportFactory, times(transportsAddr1)) verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke(); assertNoCallbackInvoke();
fakeClock.forwardNanos(1); fakeClock.forwardNanos(1);
@ -355,9 +358,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1)) verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one too // Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
@ -368,9 +371,9 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)) verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one too // Fail this one too
assertNoCallbackInvoke(); assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED); transports.poll().listener.transportShutdown(Status.RESOURCE_EXHAUSTED);
@ -386,9 +389,9 @@ public class InternalSubchannelTest {
fakeClock.forwardNanos(99); fakeClock.forwardNanos(99);
verify(mockTransportFactory, times(transportsAddr1)) verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke(); assertNoCallbackInvoke();
fakeClock.forwardNanos(1); fakeClock.forwardNanos(1);
@ -396,9 +399,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1)) verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Let it through // Let it through
assertNoCallbackInvoke(); assertNoCallbackInvoke();
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
@ -422,9 +425,9 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)) verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail the transport // Fail the transport
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
@ -433,9 +436,9 @@ public class InternalSubchannelTest {
verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)) verify(mockTransportFactory, times(++transportsAddr2))
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one too // Fail this one too
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -450,9 +453,9 @@ public class InternalSubchannelTest {
fakeClock.forwardNanos(9); fakeClock.forwardNanos(9);
verify(mockTransportFactory, times(transportsAddr1)) verify(mockTransportFactory, times(transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState()); assertEquals(TRANSIENT_FAILURE, internalSubchannel.getState());
assertNoCallbackInvoke(); assertNoCallbackInvoke();
fakeClock.forwardNanos(1); fakeClock.forwardNanos(1);
@ -460,9 +463,9 @@ public class InternalSubchannelTest {
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
verify(mockTransportFactory, times(++transportsAddr1)) verify(mockTransportFactory, times(++transportsAddr1))
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Final checks on invocations on back-off policies // Final checks on invocations on back-off policies
verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos(); verify(mockBackoffPolicy1, times(backoff1Consulted)).nextBackoffNanos();
@ -499,18 +502,18 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
// Second address connects // Second address connects
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY"); assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState()); assertEquals(READY, internalSubchannel.getState());
@ -531,15 +534,15 @@ public class InternalSubchannelTest {
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
verify(mockTransportFactory, times(2)) verify(mockTransportFactory, times(2))
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr3, eq(addr3),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verifyNoMoreInteractions(mockTransportFactory); verifyNoMoreInteractions(mockTransportFactory);
@ -558,18 +561,18 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
// Second address connecting // Second address connecting
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertNoCallbackInvoke(); assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
@ -591,15 +594,15 @@ public class InternalSubchannelTest {
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
verify(mockTransportFactory, times(2)) verify(mockTransportFactory, times(2))
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr3, eq(addr3),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verifyNoMoreInteractions(mockTransportFactory); verifyNoMoreInteractions(mockTransportFactory);
@ -616,14 +619,14 @@ public class InternalSubchannelTest {
// Nothing happened on address update // Nothing happened on address update
verify(mockTransportFactory, never()) verify(mockTransportFactory, never())
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
verify(mockTransportFactory, never()) verify(mockTransportFactory, never())
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
verifyNoMoreInteractions(mockTransportFactory); verifyNoMoreInteractions(mockTransportFactory);
assertNoCallbackInvoke(); assertNoCallbackInvoke();
assertEquals(IDLE, internalSubchannel.getState()); assertEquals(IDLE, internalSubchannel.getState());
@ -633,9 +636,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// And no other addresses attempted // And no other addresses attempted
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
@ -660,18 +663,18 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
// Second address connects // Second address connects
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY"); assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState()); assertEquals(READY, internalSubchannel.getState());
@ -692,15 +695,15 @@ public class InternalSubchannelTest {
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr3, eq(addr3),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr4, eq(addr4),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verifyNoMoreInteractions(mockTransportFactory); verifyNoMoreInteractions(mockTransportFactory);
@ -720,18 +723,18 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr1, eq(addr1),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
// Second address connecting // Second address connecting
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr2, eq(addr2),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertNoCallbackInvoke(); assertNoCallbackInvoke();
assertEquals(CONNECTING, internalSubchannel.getState()); assertEquals(CONNECTING, internalSubchannel.getState());
@ -750,15 +753,15 @@ public class InternalSubchannelTest {
assertEquals(0, fakeClock.numPendingTasks()); assertEquals(0, fakeClock.numPendingTasks());
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr3, eq(addr3),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr4, eq(addr4),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verifyNoMoreInteractions(mockTransportFactory); verifyNoMoreInteractions(mockTransportFactory);
@ -776,18 +779,18 @@ public class InternalSubchannelTest {
// Won't connect until requested // Won't connect until requested
verify(mockTransportFactory, times(transportsCreated)) verify(mockTransportFactory, times(transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// First attempt // First attempt
internalSubchannel.obtainActiveTransport(); internalSubchannel.obtainActiveTransport();
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one // Fail this one
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -798,9 +801,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Make this one proceed // Make this one proceed
transports.peek().listener.transportReady(); transports.peek().listener.transportReady();
@ -818,9 +821,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory, times(++transportsCreated)) verify(mockTransportFactory, times(++transportsCreated))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
} }
@Test @Test
@ -852,9 +855,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
// Fail this one // Fail this one
MockClientTransportInfo transportInfo = transports.poll(); MockClientTransportInfo transportInfo = transports.poll();
@ -1076,9 +1079,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
transports.poll().listener.transportShutdown(Status.UNAVAILABLE); transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE); assertExactCallbackInvokes("onStateChange:" + UNAVAILABLE_STATE);
@ -1097,9 +1100,9 @@ public class InternalSubchannelTest {
verify(mockTransportFactory, times(2)) verify(mockTransportFactory, times(2))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
assertExactCallbackInvokes("onStateChange:CONNECTING"); assertExactCallbackInvokes("onStateChange:CONNECTING");
assertTrue(reconnectTask.isCancelled()); assertTrue(reconnectTask.isCancelled());
@ -1108,9 +1111,9 @@ public class InternalSubchannelTest {
assertNoCallbackInvoke(); assertNoCallbackInvoke();
verify(mockTransportFactory, times(2)) verify(mockTransportFactory, times(2))
.newClientTransport( .newClientTransport(
addr, eq(addr),
createClientTransportOptions(), eq(createClientTransportOptions()),
internalSubchannel.getChannelLogger()); isA(TransportLogger.class));
verify(mockBackoffPolicyProvider, times(1)).get(); verify(mockBackoffPolicyProvider, times(1)).get();
// Fail the reconnect attempt to verify that a fresh reconnect policy is generated after // Fail the reconnect attempt to verify that a fresh reconnect policy is generated after

View File

@ -98,6 +98,7 @@ import io.grpc.ServerMethodDefinition;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions; import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
import io.grpc.internal.InternalSubchannel.TransportLogger;
import io.grpc.internal.TestUtils.MockClientTransportInfo; import io.grpc.internal.TestUtils.MockClientTransportInfo;
import io.grpc.stub.ClientCalls; import io.grpc.stub.ClientCalls;
import io.grpc.testing.TestMethodDescriptors; import io.grpc.testing.TestMethodDescriptors;
@ -1256,16 +1257,24 @@ public class ManagedChannelImplTest {
// requestConnection() // requestConnection()
verify(mockTransportFactory, never()) verify(mockTransportFactory, never())
.newClientTransport( .newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class)); any(SocketAddress.class),
any(ClientTransportOptions.class),
any(TransportLogger.class));
sub1.requestConnection(); sub1.requestConnection();
verify(mockTransportFactory) verify(mockTransportFactory)
.newClientTransport(socketAddress, clientTransportOptions, sub1.getChannelLogger()); .newClientTransport(
eq(socketAddress),
eq(clientTransportOptions),
isA(TransportLogger.class));
MockClientTransportInfo transportInfo1 = transports.poll(); MockClientTransportInfo transportInfo1 = transports.poll();
assertNotNull(transportInfo1); assertNotNull(transportInfo1);
sub2.requestConnection(); sub2.requestConnection();
verify(mockTransportFactory, times(1)) verify(mockTransportFactory, times(2))
.newClientTransport(socketAddress, clientTransportOptions, sub2.getChannelLogger()); .newClientTransport(
eq(socketAddress),
eq(clientTransportOptions),
isA(TransportLogger.class));
MockClientTransportInfo transportInfo2 = transports.poll(); MockClientTransportInfo transportInfo2 = transports.poll();
assertNotNull(transportInfo2); assertNotNull(transportInfo2);
@ -1274,7 +1283,7 @@ public class ManagedChannelImplTest {
// The subchannel doesn't matter since this isn't called // The subchannel doesn't matter since this isn't called
verify(mockTransportFactory, times(2)) verify(mockTransportFactory, times(2))
.newClientTransport( .newClientTransport(
eq(socketAddress), eq(clientTransportOptions), isA(ChannelLogger.class)); eq(socketAddress), eq(clientTransportOptions), isA(TransportLogger.class));
// shutdown() has a delay // shutdown() has a delay
sub1.shutdown(); sub1.shutdown();

View File

@ -581,7 +581,7 @@ public final class NettyChannelBuilder
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
tooManyPingsRunnable, transportTracer, options.getEagAttributes(), tooManyPingsRunnable, transportTracer, options.getEagAttributes(),
localSocketPicker); localSocketPicker, channelLogger);
return transport; return transport;
} }

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz.SocketStats; import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -94,6 +95,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final TransportTracer transportTracer; private final TransportTracer transportTracer;
private final Attributes eagAttributes; private final Attributes eagAttributes;
private final LocalSocketPicker localSocketPicker; private final LocalSocketPicker localSocketPicker;
private final ChannelLogger channelLogger;
NettyClientTransport( NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory, SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
@ -102,7 +104,7 @@ class NettyClientTransport implements ConnectionClientTransport {
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes,
LocalSocketPicker localSocketPicker) { LocalSocketPicker localSocketPicker, ChannelLogger channelLogger) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator"); this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
this.negotiationScheme = this.negotiator.scheme(); this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address"); this.remoteAddress = Preconditions.checkNotNull(address, "address");
@ -124,6 +126,7 @@ class NettyClientTransport implements ConnectionClientTransport {
this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes"); this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker"); this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString()); this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
} }
@Override @Override

View File

@ -39,6 +39,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ChannelLogger;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.InternalChannelz; import io.grpc.InternalChannelz;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -187,7 +188,7 @@ public class NettyClientTransportTest {
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker()); new SocketPicker(), new FakeChannelLogger());
transports.add(transport); transports.add(transport);
callMeMaybe(transport.start(clientTransportListener)); callMeMaybe(transport.start(clientTransportListener));
@ -428,7 +429,8 @@ public class NettyClientTransportTest {
new HashMap<ChannelOption<?>, Object>(), group, new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker()); null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
new FakeChannelLogger());
transports.add(transport); transports.add(transport);
// Should not throw // Should not throw
@ -644,7 +646,7 @@ public class NettyClientTransportTest {
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano, keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable, false, authority, userAgent, tooManyPingsRunnable,
new TransportTracer(), eagAttributes, new SocketPicker()); new TransportTracer(), eagAttributes, new SocketPicker(), new FakeChannelLogger());
transports.add(transport); transports.add(transport);
return transport; return transport;
} }
@ -885,4 +887,13 @@ public class NettyClientTransportTest {
return null; return null;
} }
} }
private static final class FakeChannelLogger extends ChannelLogger {
@Override
public void log(ChannelLogLevel level, String message) {}
@Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
}
} }