Add log ID.

To ManagedChannelImpl, TransportSet and all client transport
implementations, so they can be correlated in the logs. Also added more
life-cycle logging in general.
This commit is contained in:
Kun Zhang 2016-03-09 11:44:38 -08:00
parent b9196e3084
commit b9c12327eb
11 changed files with 148 additions and 21 deletions

View File

@ -40,6 +40,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ServerStream;
@ -166,7 +167,12 @@ class InProcessTransport implements ServerTransport, ManagedClientTransport {
@Override
public String toString() {
return super.toString() + "(" + name + ")";
return getLogId() + "(" + name + ")";
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
private synchronized void notifyShutdown(Status s) {

View File

@ -230,6 +230,11 @@ class DelayedClientTransport implements ManagedClientTransport {
}
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
private class PendingStream extends DelayedStream {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;

View File

@ -479,5 +479,12 @@ public final class GrpcUtil {
}
}
/**
* The canonical implementation of {@link WithLogId#getLogId}.
*/
public static String getLogId(WithLogId subject) {
return subject.getClass().getSimpleName() + "@" + Integer.toHexString(subject.hashCode());
}
private GrpcUtil() {}
}

View File

@ -68,6 +68,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@ -76,7 +78,9 @@ import javax.annotation.concurrent.ThreadSafe;
/** A communication channel for making outgoing RPCs. */
@ThreadSafe
public final class ManagedChannelImpl extends ManagedChannel {
public final class ManagedChannelImpl extends ManagedChannel implements WithLogId {
private static final Logger log = Logger.getLogger(ManagedChannelImpl.class.getName());
// Matching this pattern means the target string is a URI target or at least intended to be one.
// A URI target must be an absolute hierarchical URI.
// From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
@ -173,6 +177,9 @@ public final class ManagedChannelImpl extends ManagedChannel {
loadBalancer.handleNameResolutionError(error);
}
});
if (log.isLoggable(Level.INFO)) {
log.log(Level.INFO, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
}
}
@VisibleForTesting
@ -252,6 +259,9 @@ public final class ManagedChannelImpl extends ManagedChannel {
for (DelayedClientTransport transport : delayedTransportsCopy) {
transport.shutdown();
}
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Shutting down", getLogId());
}
return this;
}
@ -344,6 +354,9 @@ public final class ManagedChannelImpl extends ManagedChannel {
return;
}
if (shutdown && transports.isEmpty() && delayedTransports.isEmpty()) {
if (log.isLoggable(Level.INFO)) {
log.log(Level.INFO, "[{0}] Terminated", getLogId());
}
terminated = true;
lock.notifyAll();
if (usingSharedExecutor) {
@ -380,6 +393,10 @@ public final class ManagedChannelImpl extends ManagedChannel {
}
}
});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} created for {2}",
new Object[] {getLogId(), ts.getLogId(), addressGroup});
}
transports.put(addressGroup, ts);
}
}
@ -403,6 +420,11 @@ public final class ManagedChannelImpl extends ManagedChannel {
}
};
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
private class InterimTransportImpl implements InterimTransport<ClientTransport> {
private final DelayedClientTransport delayedTransport;
private boolean closed;

View File

@ -47,7 +47,7 @@ import javax.annotation.concurrent.ThreadSafe;
* {@link Listener#transportTerminated}.
*/
@ThreadSafe
public interface ManagedClientTransport extends ClientTransport {
public interface ManagedClientTransport extends ClientTransport, WithLogId {
/**
* Starts transport. This method may only be called once.

View File

@ -59,9 +59,8 @@ import javax.annotation.concurrent.ThreadSafe;
* Transports for a single {@link SocketAddress}.
*/
@ThreadSafe
final class TransportSet {
final class TransportSet implements WithLogId {
private static final Logger log = Logger.getLogger(TransportSet.class.getName());
private static final ClientTransport SHUTDOWN_TRANSPORT =
new FailingClientTransport(Status.UNAVAILABLE.withDescription("TransportSet is shutdown"));
@ -217,8 +216,10 @@ final class TransportSet {
backoffWatch.reset().start();
}
newActiveTransport = transportFactory.newClientTransport(address, authority);
log.log(Level.FINE, "Created transport {0} for {1}",
new Object[] {newActiveTransport, address});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {getLogId(), newActiveTransport.getLogId(), address});
}
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, address));
@ -258,6 +259,10 @@ final class TransportSet {
}
}
firstAttempt = false;
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Scheduling connection after {1} ms for {2}",
new Object[]{getLogId(), delayMillis, address});
}
if (delayMillis <= 0) {
reconnectTask = null;
// No back-off this time.
@ -315,6 +320,11 @@ final class TransportSet {
}
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
/** Shared base for both delayed and real transports. */
private class BaseTransportListener implements ManagedClientTransport.Listener {
protected final ManagedClientTransport transport;
@ -335,6 +345,9 @@ final class TransportSet {
synchronized (lock) {
transports.remove(transport);
if (shutdown && transports.isEmpty()) {
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Terminated", getLogId());
}
runCallback = true;
cancelReconnectTask();
}
@ -360,7 +373,10 @@ final class TransportSet {
@Override
public void transportReady() {
log.log(Level.FINE, "Transport {0} for {1} is ready", new Object[] {transport, address});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is ready",
new Object[] {getLogId(), transport.getLogId(), address});
}
super.transportReady();
synchronized (lock) {
if (isAttachedToActiveTransport()) {
@ -372,8 +388,10 @@ final class TransportSet {
@Override
public void transportShutdown(Status s) {
log.log(Level.FINE, "Transport {0} for {1} is being shutdown",
new Object[] {transport, address});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
new Object[] {getLogId(), transport.getLogId(), address, s});
}
super.transportShutdown(s);
synchronized (lock) {
if (isAttachedToActiveTransport()) {
@ -385,8 +403,10 @@ final class TransportSet {
@Override
public void transportTerminated() {
log.log(Level.FINE, "Transport {0} for {1} is terminated",
new Object[] {transport, address});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is terminated",
new Object[] {getLogId(), transport.getLogId(), address});
}
super.transportTerminated();
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransport. "

View File

@ -0,0 +1,47 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
/**
* An object that has an ID that is unique within the JVM, primarily for debug logging.
*/
public interface WithLogId {
/**
* Returns an ID that is primarily used in debug logs. It usually contains the class name and a
* numeric ID that is unique among the instances. {@link GprcUtil#getLogId} is a canonical
* implementation.
*
* <p>The subclasses of this interface usually want to include the log ID in their {@link
* #toString} results.
*/
String getLogId();
}

View File

@ -41,6 +41,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -248,6 +249,7 @@ public class ManagedChannelImplTest {
verify(mockTransportFactory).release();
verifyNoMoreInteractions(mockTransportFactory);
verify(mockTransport, atLeast(0)).getLogId();
verifyNoMoreInteractions(mockTransport);
verifyNoMoreInteractions(mockStream);
}

View File

@ -108,7 +108,7 @@ public class TransportSetTest {
@Test public void singleAddressBackoff() {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
createTransportSet(addr);
// Invocation counters
int transportsCreated = 0;
@ -162,7 +162,7 @@ public class TransportSetTest {
@Test public void twoAddressesBackoff() {
SocketAddress addr1 = mock(SocketAddress.class);
SocketAddress addr2 = mock(SocketAddress.class);
createTransortSet(addr1, addr2);
createTransportSet(addr1, addr2);
// Invocation counters
int transportsAddr1 = 0;
@ -232,7 +232,7 @@ public class TransportSetTest {
@Test
public void connectIsLazy() {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
createTransportSet(addr);
// Invocation counters
int transportsCreated = 0;
@ -268,7 +268,7 @@ public class TransportSetTest {
@Test
public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
createTransportSet(addr);
// First transport is created immediately
ClientTransport pick = transportSet.obtainActiveTransport();
@ -316,7 +316,7 @@ public class TransportSetTest {
@Test
public void shutdownBeforeTransportCreatedWithoutPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
createTransportSet(addr);
// First transport is created immediately
ClientTransport pick = transportSet.obtainActiveTransport();
@ -350,7 +350,7 @@ public class TransportSetTest {
@Test
public void obtainTransportAfterShutdown() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
createTransportSet(addr);
transportSet.shutdown();
ClientTransport pick = transportSet.obtainActiveTransport();
@ -358,7 +358,14 @@ public class TransportSetTest {
verify(mockTransportFactory, times(0)).newClientTransport(addr, authority);
}
private void createTransortSet(SocketAddress ... addrs) {
@Test
public void logId() {
createTransportSet(mock(SocketAddress.class));
assertEquals("TransportSet@" + Integer.toHexString(transportSet.hashCode()),
transportSet.getLogId());
}
private void createTransportSet(SocketAddress ... addrs) {
addressGroup = new EquivalentAddressGroup(Arrays.asList(addrs));
transportSet = new TransportSet(addressGroup, authority, mockLoadBalancer,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.scheduledExecutorService,

View File

@ -40,6 +40,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.ManagedClientTransport;
import io.netty.bootstrap.Bootstrap;
@ -191,7 +192,12 @@ class NettyClientTransport implements ManagedClientTransport {
@Override
public String toString() {
return super.toString() + "(" + address + ")";
return getLogId() + "(" + address + ")";
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
/**

View File

@ -400,7 +400,12 @@ class OkHttpClientTransport implements ManagedClientTransport {
@Override
public String toString() {
return super.toString() + "(" + address + ")";
return getLogId() + "(" + address + ")";
}
@Override
public String getLogId() {
return GrpcUtil.getLogId(this);
}
/**