core: channel tracing to log events for channel state change

This commit is contained in:
ZHANG Dapeng 2018-06-04 17:38:26 -07:00 committed by GitHub
parent 8a3e623be7
commit c796901aca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 6 deletions

View File

@ -64,6 +64,9 @@ final class ChannelTracer {
reportEvent(new ChannelTrace.Event.Builder() reportEvent(new ChannelTrace.Event.Builder()
.setDescription(channelType + " created") .setDescription(channelType + " created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
// passing the timestamp in as a parameter instead of computing it right here because when
// parent channel and subchannel both report the same event of the subchannel (e.g. creation
// event of the subchannel) we want the timestamps to be exactly the same.
.setTimestampNanos(channelCreationTimeNanos) .setTimestampNanos(channelCreationTimeNanos)
.build()); .build());
} }

View File

@ -38,6 +38,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.ChannelTrace;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -70,6 +71,7 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
private final CallTracer callsTracer; private final CallTracer callsTracer;
@CheckForNull @CheckForNull
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final TimeProvider timeProvider;
// File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock.
private final Object lock = new Object(); private final Object lock = new Object();
@ -161,7 +163,8 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback,
Channelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer) { Channelz channelz, CallTracer callsTracer, @Nullable ChannelTracer channelTracer,
TimeProvider timeProvider) {
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
this.authority = authority; this.authority = authority;
this.userAgent = userAgent; this.userAgent = userAgent;
@ -174,6 +177,7 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
this.channelz = channelz; this.channelz = channelz;
this.callsTracer = callsTracer; this.callsTracer = callsTracer;
this.channelTracer = channelTracer; this.channelTracer = channelTracer;
this.timeProvider = timeProvider;
} }
/** /**
@ -314,6 +318,14 @@ final class InternalSubchannel implements Instrumented<ChannelStats> {
Preconditions.checkState(state.getState() != SHUTDOWN, Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState); "Cannot transition out of SHUTDOWN to " + newState);
state = newState; state = newState;
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + state + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
channelExecutor.executeLater(new Runnable() { channelExecutor.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {

View File

@ -385,6 +385,14 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
shutdownNameResolverAndLoadBalancer(true); shutdownNameResolverAndLoadBalancer(true);
delayedTransport.reprocess(null); delayedTransport.reprocess(null);
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering IDLE state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
channelStateManager.gotoState(IDLE); channelStateManager.gotoState(IDLE);
} }
@ -666,6 +674,13 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
channelExecutor.executeLater(new Runnable() { channelExecutor.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {
if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Entering SHUTDOWN state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
channelStateManager.gotoState(SHUTDOWN); channelStateManager.gotoState(SHUTDOWN);
} }
}); });
@ -724,6 +739,14 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
} }
}; };
updateSubchannelPicker(newPicker); updateSubchannelPicker(newPicker);
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering TRANSIENT_FAILURE state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
channelStateManager.gotoState(TRANSIENT_FAILURE); channelStateManager.gotoState(TRANSIENT_FAILURE);
} }
@ -1034,7 +1057,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
}, },
channelz, channelz,
callTracerFactory.create(), callTracerFactory.create(),
subchannelTracer); subchannelTracer,
timeProvider);
if (channelTracer != null) { if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder() channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child channel created") .setDescription("Child channel created")
@ -1085,6 +1109,14 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
// It's not appropriate to report SHUTDOWN state from lb. // It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now. // Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) { if (newState != SHUTDOWN) {
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + newState + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
channelStateManager.gotoState(newState); channelStateManager.gotoState(newState);
} }
} }
@ -1111,7 +1143,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
} }
final OobChannel oobChannel = new OobChannel( final OobChannel oobChannel = new OobChannel(
authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz); channelExecutor, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
if (channelTracer != null) { if (channelTracer != null) {
channelTracer.reportEvent(new ChannelTrace.Event.Builder() channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child channel created") .setDescription("Child channel created")
@ -1142,7 +1174,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
}, },
channelz, channelz,
callTracerFactory.create(), callTracerFactory.create(),
subchannelTracer); subchannelTracer,
timeProvider);
if (oobChannelTracer != null) { if (oobChannelTracer != null) {
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder() oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child channel created") .setDescription("Child channel created")

View File

@ -26,6 +26,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo; import io.grpc.ConnectivityStateInfo;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
@ -39,6 +40,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.ChannelTrace;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -75,6 +77,7 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
private final CallTracer channelCallsTracer; private final CallTracer channelCallsTracer;
@CheckForNull @CheckForNull
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final TimeProvider timeProvider;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() { private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override @Override
@ -95,7 +98,8 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
OobChannel( OobChannel(
String authority, ObjectPool<? extends Executor> executorPool, String authority, ObjectPool<? extends Executor> executorPool,
ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor,
CallTracer callsTracer, @Nullable ChannelTracer channelTracer, Channelz channelz) { CallTracer callsTracer, @Nullable ChannelTracer channelTracer, Channelz channelz,
TimeProvider timeProvider) {
this.authority = checkNotNull(authority, "authority"); this.authority = checkNotNull(authority, "authority");
this.executorPool = checkNotNull(executorPool, "executorPool"); this.executorPool = checkNotNull(executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor"); this.executor = checkNotNull(executorPool.getObject(), "executor");
@ -126,6 +130,7 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
}); });
this.channelCallsTracer = callsTracer; this.channelCallsTracer = callsTracer;
this.channelTracer = channelTracer; this.channelTracer = channelTracer;
this.timeProvider = timeProvider;
} }
// Must be called only once, right after the OobChannel is created. // Must be called only once, right after the OobChannel is created.
@ -203,6 +208,14 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
return terminatedLatch.await(time, unit); return terminatedLatch.await(time, unit);
} }
@Override
public ConnectivityState getState(boolean requestConnectionIgnored) {
if (subchannel == null) {
return ConnectivityState.IDLE;
}
return subchannel.getState();
}
@Override @Override
public ManagedChannel shutdown() { public ManagedChannel shutdown() {
shutdown = true; shutdown = true;
@ -224,6 +237,14 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
} }
void handleSubchannelStateChange(final ConnectivityStateInfo newState) { void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + newState.getState() + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
}
switch (newState.getState()) { switch (newState.getState()) {
case READY: case READY:
case IDLE: case IDLE:

View File

@ -951,7 +951,13 @@ public class InternalSubchannelTest {
internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT, internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback, fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback,
channelz, CallTracer.getDefaultFactory().create(), null); channelz, CallTracer.getDefaultFactory().create(), null,
new TimeProvider() {
@Override
public long currentTimeNanos() {
return fakeClock.getTicker().read();
}
});
} }
private void assertNoCallbackInvoke() { private void assertNoCallbackInvoke() {

View File

@ -2129,6 +2129,49 @@ public class ManagedChannelImplTest {
.build()); .build());
} }
@Test
public void channelTracing_stateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
helper.updateBalancingState(CONNECTING, mockPicker);
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Entering CONNECTING state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_subchannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
AbstractSubchannel subchannel =
(AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
timer.forwardNanos(1234);
subchannel.obtainActiveTransport();
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Entering CONNECTING state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void channelTracing_oobChannelStateChangeEvent() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "authority");
timer.forwardNanos(1234);
oobChannel.handleSubchannelStateChange(
ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
assertThat(getStats(oobChannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Entering CONNECTING state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test @Test
public void channelTracing_oobChannelCreationEvents() throws Exception { public void channelTracing_oobChannelCreationEvents() throws Exception {
channelBuilder.maxTraceEvents(10); channelBuilder.maxTraceEvents(10);