core: ManagedChannel should implement InternalInstrumented

This commit is contained in:
zpencer 2017-12-14 18:09:46 -08:00 committed by GitHub
parent 4c17382a43
commit 7e655beb88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 191 additions and 108 deletions

View File

@ -0,0 +1,39 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import javax.annotation.concurrent.Immutable;
/**
* A data class to represent a channel's stats.
*/
// Not final so that InternalChannelStats can make this class visible outside of io.grpc
@Immutable
class ChannelStats {
public final long callsStarted;
public final long callsSucceeded;
public final long callsFailed;
public final long lastCallStartedMillis;
ChannelStats(
long callsStarted, long callsSucceeded, long callsFailed, long lastCallStartedMillis) {
this.callsStarted = callsStarted;
this.callsSucceeded = callsSucceeded;
this.callsFailed = callsFailed;
this.lastCallStartedMillis = lastCallStartedMillis;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
/**
* An internal gRPC class. Do not use.
*/
@Internal
public final class InternalChannelStats extends ChannelStats {
public InternalChannelStats(
long callsStarted,
long callsSucceeded,
long callsFailed,
long lastCallStartedMillis) {
super(callsStarted, callsSucceeded, callsFailed, lastCallStartedMillis);
}
}

View File

@ -23,7 +23,8 @@ import javax.annotation.concurrent.ThreadSafe;
* A {@link Channel} that provides lifecycle management. * A {@link Channel} that provides lifecycle management.
*/ */
@ThreadSafe @ThreadSafe
public abstract class ManagedChannel extends Channel { public abstract class ManagedChannel
extends Channel implements InternalInstrumented<InternalChannelStats> {
/** /**
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
* cancelled. * cancelled.

View File

@ -346,7 +346,7 @@ public abstract class AbstractManagedChannelImplBuilder
GrpcUtil.STOPWATCH_SUPPLIER, GrpcUtil.STOPWATCH_SUPPLIER,
getEffectiveInterceptors(), getEffectiveInterceptors(),
GrpcUtil.getProxyDetector(), GrpcUtil.getProxyDetector(),
ChannelStats.getDefaultFactory()); ChannelTracer.getDefaultFactory());
} }
@VisibleForTesting @VisibleForTesting

View File

@ -17,23 +17,19 @@
package io.grpc.internal; package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.InternalChannelStats;
/** /**
* A collection of channel level stats for channelz. * A collection of channel level stats for channelz.
*/ */
final class ChannelStats { final class ChannelTracer {
private final TimeProvider timeProvider; private final TimeProvider timeProvider;
private final LongCounter callsStarted = LongCounterFactory.create(); private final LongCounter callsStarted = LongCounterFactory.create();
private final LongCounter callsSucceeded = LongCounterFactory.create(); private final LongCounter callsSucceeded = LongCounterFactory.create();
private final LongCounter callsFailed = LongCounterFactory.create(); private final LongCounter callsFailed = LongCounterFactory.create();
private volatile long lastCallStartedMillis; private volatile long lastCallStartedMillis;
private ChannelStats() { ChannelTracer(TimeProvider timeProvider) {
timeProvider = SYSTEM_TIME_PROVIDER;
}
@VisibleForTesting
ChannelStats(TimeProvider timeProvider) {
this.timeProvider = timeProvider; this.timeProvider = timeProvider;
} }
@ -50,20 +46,9 @@ final class ChannelStats {
} }
} }
long getCallsStarted() { public InternalChannelStats getStats() {
return callsStarted.value(); return new InternalChannelStats(
} callsStarted.value(), callsSucceeded.value(), callsFailed.value(), lastCallStartedMillis);
long getCallsSucceeded() {
return callsSucceeded.value();
}
long getCallsFailed() {
return callsFailed.value();
}
long getLastCallStartedMillis() {
return lastCallStartedMillis;
} }
@VisibleForTesting @VisibleForTesting
@ -72,24 +57,25 @@ final class ChannelStats {
long currentTimeMillis(); long currentTimeMillis();
} }
private static final TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() { public interface Factory {
ChannelTracer create();
}
static final TimeProvider SYSTEM_TIME_PROVIDER = new TimeProvider() {
@Override @Override
public long currentTimeMillis() { public long currentTimeMillis() {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
}; };
private static final Factory DEFAULT_FACTORY = new Factory() {
static final Factory DEFAULT_FACTORY = new Factory() {
@Override @Override
public ChannelStats create() { public ChannelTracer create() {
return new ChannelStats(SYSTEM_TIME_PROVIDER); return new ChannelTracer(SYSTEM_TIME_PROVIDER);
} }
}; };
public static Factory getDefaultFactory() { public static Factory getDefaultFactory() {
return DEFAULT_FACTORY; return DEFAULT_FACTORY;
} }
public interface Factory {
ChannelStats create();
}
} }

View File

@ -68,7 +68,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method; private final MethodDescriptor<ReqT, RespT> method;
private final Executor callExecutor; private final Executor callExecutor;
private final ChannelStats channelStats; private final ChannelTracer channelTracer;
private final Context context; private final Context context;
private volatile ScheduledFuture<?> deadlineCancellationFuture; private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest; private final boolean unaryRequest;
@ -88,7 +88,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
ClientTransportProvider clientTransportProvider, ClientTransportProvider clientTransportProvider,
ScheduledExecutorService deadlineCancellationExecutor, ScheduledExecutorService deadlineCancellationExecutor,
ChannelStats channelStats) { ChannelTracer channelTracer) {
this.method = method; this.method = method;
// If we know that the executor is a direct executor, we don't need to wrap it with a // If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons. // SerializingExecutor. This is purely for performance reasons.
@ -96,7 +96,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
this.callExecutor = executor == directExecutor() this.callExecutor = executor == directExecutor()
? new SerializeReentrantCallsDirectExecutor() ? new SerializeReentrantCallsDirectExecutor()
: new SerializingExecutor(executor); : new SerializingExecutor(executor);
this.channelStats = channelStats; this.channelTracer = channelTracer;
// Propagate the context from the thread which initiated the call to all callbacks. // Propagate the context from the thread which initiated the call to all callbacks.
this.context = Context.current(); this.context = Context.current();
this.unaryRequest = method.getType() == MethodType.UNARY this.unaryRequest = method.getType() == MethodType.UNARY
@ -261,7 +261,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
stream.setCompressor(compressor); stream.setCompressor(compressor);
stream.setFullStreamDecompression(fullStreamDecompression); stream.setFullStreamDecompression(fullStreamDecompression);
stream.setDecompressorRegistry(decompressorRegistry); stream.setDecompressorRegistry(decompressorRegistry);
channelStats.reportCallStarted(); channelTracer.reportCallStarted();
stream.start(new ClientStreamListenerImpl(observer)); stream.start(new ClientStreamListenerImpl(observer));
// Delay any sources of cancellation after start(), because most of the transports are broken if // Delay any sources of cancellation after start(), because most of the transports are broken if
@ -557,7 +557,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
closeObserver(observer, status, trailers); closeObserver(observer, status, trailers);
} finally { } finally {
removeContextListenerAndCancelDeadlineFuture(); removeContextListenerAndCancelDeadlineFuture();
channelStats.reportCallEnded(status.isOk()); channelTracer.reportCallEnded(status.isOk());
} }
} }

View File

@ -26,6 +26,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
@ -38,8 +40,8 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalChannelStats;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.PickSubchannelArgs;
@ -76,7 +78,7 @@ import javax.annotation.concurrent.ThreadSafe;
/** A communication channel for making outgoing RPCs. */ /** A communication channel for making outgoing RPCs. */
@ThreadSafe @ThreadSafe
public final class ManagedChannelImpl extends ManagedChannel implements InternalWithLogId { public final class ManagedChannelImpl extends ManagedChannel {
static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName()); static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
// Matching this pattern means the target string is a URI target or at least intended to be one. // Matching this pattern means the target string is a URI target or at least intended to be one.
@ -188,8 +190,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
private final ManagedChannelReference phantom; private final ManagedChannelReference phantom;
private final ChannelStats.Factory channelStatsFactory; // to create new stats for each oobchannel private final ChannelTracer.Factory channelTracerFactory; // new tracer for each oobchannel
final ChannelStats channelStats; private final ChannelTracer channelTracer;
// Called from channelExecutor // Called from channelExecutor
private final ManagedClientTransport.Listener delayedTransportListener = private final ManagedClientTransport.Listener delayedTransportListener =
@ -260,6 +262,13 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
} }
}; };
@Override
public ListenableFuture<InternalChannelStats> getStats() {
SettableFuture<InternalChannelStats> ret = SettableFuture.create();
ret.set(channelTracer.getStats());
return ret;
}
// Run from channelExecutor // Run from channelExecutor
private class IdleModeTimer implements Runnable { private class IdleModeTimer implements Runnable {
// Only mutated from channelExecutor // Only mutated from channelExecutor
@ -434,7 +443,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
Supplier<Stopwatch> stopwatchSupplier, Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors, List<ClientInterceptor> interceptors,
ProxyDetector proxyDetector, ProxyDetector proxyDetector,
ChannelStats.Factory channelStatsFactory) { ChannelTracer.Factory channelTracerFactory) {
this.target = checkNotNull(builder.target, "target"); this.target = checkNotNull(builder.target, "target");
this.nameResolverFactory = builder.getNameResolverFactory(); this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams"); this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
@ -467,8 +476,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
this.proxyDetector = proxyDetector; this.proxyDetector = proxyDetector;
phantom = new ManagedChannelReference(this); phantom = new ManagedChannelReference(this);
this.channelStatsFactory = channelStatsFactory; this.channelTracerFactory = channelTracerFactory;
channelStats = channelStatsFactory.create(); channelTracer = channelTracerFactory.create();
logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target}); logger.log(Level.FINE, "[{0}] Created with target {1}", new Object[] {getLogId(), target});
} }
@ -620,7 +629,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
callOptions, callOptions,
transportProvider, transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService(), terminated ? null : transportFactory.getScheduledExecutorService(),
channelStats) channelTracer)
.setFullStreamDecompression(fullStreamDecompression) .setFullStreamDecompression(fullStreamDecompression)
.setDecompressorRegistry(decompressorRegistry) .setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry); .setCompressorRegistry(compressorRegistry);
@ -817,7 +826,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Internal
checkState(!terminated, "Channel is terminated"); checkState(!terminated, "Channel is terminated");
final OobChannel oobChannel = new OobChannel( final OobChannel oobChannel = new OobChannel(
authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor, channelStatsFactory.create()); channelExecutor, channelTracerFactory.create());
final InternalSubchannel internalSubchannel = new InternalSubchannel( final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory, addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,

View File

@ -19,12 +19,15 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ConnectivityStateInfo; import io.grpc.ConnectivityStateInfo;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalChannelStats;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.InternalWithLogId; import io.grpc.InternalWithLogId;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
@ -65,7 +68,7 @@ final class OobChannel extends ManagedChannel implements InternalWithLogId {
private final ScheduledExecutorService deadlineCancellationExecutor; private final ScheduledExecutorService deadlineCancellationExecutor;
private final CountDownLatch terminatedLatch = new CountDownLatch(1); private final CountDownLatch terminatedLatch = new CountDownLatch(1);
private volatile boolean shutdown; private volatile boolean shutdown;
final ChannelStats channelStats; private final ChannelTracer channelTracer;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() { private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override @Override
@ -86,7 +89,7 @@ final class OobChannel extends ManagedChannel implements InternalWithLogId {
OobChannel( OobChannel(
String authority, ObjectPool<? extends Executor> executorPool, String authority, ObjectPool<? extends Executor> executorPool,
ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor,
ChannelStats channelStats) { ChannelTracer channelTracer) {
this.authority = checkNotNull(authority, "authority"); this.authority = checkNotNull(authority, "authority");
this.executorPool = checkNotNull(executorPool, "executorPool"); this.executorPool = checkNotNull(executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor"); this.executor = checkNotNull(executorPool.getObject(), "executor");
@ -114,7 +117,7 @@ final class OobChannel extends ManagedChannel implements InternalWithLogId {
// Don't care // Don't care
} }
}); });
this.channelStats = channelStats; this.channelTracer = channelTracer;
} }
// Must be called only once, right after the OobChannel is created. // Must be called only once, right after the OobChannel is created.
@ -168,7 +171,7 @@ final class OobChannel extends ManagedChannel implements InternalWithLogId {
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor, return new ClientCallImpl<RequestT, ResponseT>(methodDescriptor,
callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
callOptions, transportProvider, deadlineCancellationExecutor, channelStats); callOptions, transportProvider, deadlineCancellationExecutor, channelTracer);
} }
@Override @Override
@ -243,4 +246,11 @@ final class OobChannel extends ManagedChannel implements InternalWithLogId {
Subchannel getSubchannel() { Subchannel getSubchannel() {
return subchannelImpl; return subchannelImpl;
} }
@Override
public ListenableFuture<InternalChannelStats> getStats() {
SettableFuture<InternalChannelStats> ret = SettableFuture.create();
ret.set(channelTracer.getStats());
return ret;
}
} }

View File

@ -89,7 +89,7 @@ public class ClientCallImplTest {
private final FakeClock fakeClock = new FakeClock(); private final FakeClock fakeClock = new FakeClock();
private final ScheduledExecutorService deadlineCancellationExecutor = private final ScheduledExecutorService deadlineCancellationExecutor =
fakeClock.getScheduledExecutorService(); fakeClock.getScheduledExecutorService();
private final ChannelStats channaleStats = ChannelStats.getDefaultFactory().create(); private final ChannelTracer channelTracer = ChannelTracer.getDefaultFactory().create();
private final DecompressorRegistry decompressorRegistry = private final DecompressorRegistry decompressorRegistry =
DecompressorRegistry.getDefaultInstance().with(new Codec.Gzip(), true); DecompressorRegistry.getDefaultInstance().with(new Codec.Gzip(), true);
private final MethodDescriptor<Void, Void> method = MethodDescriptor.<Void, Void>newBuilder() private final MethodDescriptor<Void, Void> method = MethodDescriptor.<Void, Void>newBuilder()
@ -151,7 +151,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -172,7 +172,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -208,7 +208,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -242,7 +242,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -275,7 +275,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -298,7 +298,7 @@ public class ClientCallImplTest {
baseCallOptions.withAuthority("overridden-authority"), baseCallOptions.withAuthority("overridden-authority"),
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -314,7 +314,7 @@ public class ClientCallImplTest {
callOptions, callOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
final Metadata metadata = new Metadata(); final Metadata metadata = new Metadata();
@ -332,7 +332,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -501,7 +501,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
Context.ROOT.attach(); Context.ROOT.attach();
@ -575,7 +575,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
previous.attach(); previous.attach();
@ -604,7 +604,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
previous.attach(); previous.attach();
@ -648,7 +648,7 @@ public class ClientCallImplTest {
callOptions, callOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(transport, times(0)) verify(transport, times(0))
@ -671,7 +671,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
@ -699,7 +699,7 @@ public class ClientCallImplTest {
callOpts, callOpts,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
@ -727,7 +727,7 @@ public class ClientCallImplTest {
callOpts, callOpts,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
@ -753,7 +753,7 @@ public class ClientCallImplTest {
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -777,7 +777,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -797,7 +797,7 @@ public class ClientCallImplTest {
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
call.cancel("canceled", null); call.cancel("canceled", null);
@ -821,7 +821,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
@ -838,7 +838,7 @@ public class ClientCallImplTest {
baseCallOptions, baseCallOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats); channelTracer);
final Exception cause = new Exception(); final Exception cause = new Exception();
ClientCall.Listener<Void> callListener = ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() { new ClientCall.Listener<Void>() {
@ -875,7 +875,7 @@ public class ClientCallImplTest {
callOptions, callOptions,
provider, provider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channaleStats) channelTracer)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -888,7 +888,7 @@ public class ClientCallImplTest {
public void getAttributes() { public void getAttributes() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>( ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method, MoreExecutors.directExecutor(), baseCallOptions, provider, method, MoreExecutors.directExecutor(), baseCallOptions, provider,
deadlineCancellationExecutor, channaleStats); deadlineCancellationExecutor, channelTracer);
Attributes attrs = Attributes attrs =
Attributes.newBuilder().set(Key.<String>of("fake key"), "fake value").build(); Attributes.newBuilder().set(Key.<String>of("fake key"), "fake value").build();
when(stream.getAttributes()).thenReturn(attrs); when(stream.getAttributes()).thenReturn(attrs);

View File

@ -143,7 +143,7 @@ public class ManagedChannelImplIdlenessTest {
builder, mockTransportFactory, new FakeBackoffPolicyProvider(), builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(), oobExecutorPool, timer.getStopwatchSupplier(),
Collections.<ClientInterceptor>emptyList(), Collections.<ClientInterceptor>emptyList(),
GrpcUtil.NOOP_PROXY_DETECTOR, ChannelStats.getDefaultFactory()); GrpcUtil.NOOP_PROXY_DETECTOR, ChannelTracer.getDefaultFactory());
newTransports = TestUtils.captureTransports(mockTransportFactory); newTransports = TestUtils.captureTransports(mockTransportFactory);
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {

View File

@ -60,6 +60,8 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller; import io.grpc.IntegerMarshaller;
import io.grpc.InternalChannelStats;
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.LoadBalancer; import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
@ -138,15 +140,15 @@ public class ManagedChannelImplTest {
private final FakeClock timer = new FakeClock(); private final FakeClock timer = new FakeClock();
private final FakeClock executor = new FakeClock(); private final FakeClock executor = new FakeClock();
private final FakeClock oobExecutor = new FakeClock(); private final FakeClock oobExecutor = new FakeClock();
private final ChannelStats.Factory channelStatsFactory = new ChannelStats.Factory() { private final ChannelTracer.Factory channelStatsFactory = new ChannelTracer.Factory() {
@Override @Override
public ChannelStats create() { public ChannelTracer create() {
return new ChannelStats(new ChannelStats.TimeProvider() { return new ChannelTracer(new ChannelTracer.TimeProvider() {
@Override @Override
public long currentTimeMillis() { public long currentTimeMillis() {
return executor.currentTimeMillis(); return executor.currentTimeMillis();
} }
}); });
} }
}; };
@ -1662,10 +1664,10 @@ public class ManagedChannelImplTest {
public void channelStat_callStarted() throws Exception { public void channelStat_callStarted() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT); ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
assertEquals(0, channel.channelStats.getCallsStarted()); assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata()); call.start(mockCallListener, new Metadata());
assertEquals(1, channel.channelStats.getCallsStarted()); assertEquals(1, getStats(channel).callsStarted);
assertEquals(executor.currentTimeMillis(), channel.channelStats.getLastCallStartedMillis()); assertEquals(executor.currentTimeMillis(), getStats(channel).lastCallStartedMillis);
} }
@Test @Test
@ -1701,12 +1703,12 @@ public class ManagedChannelImplTest {
// the actual test // the actual test
ClientStreamListener streamListener = streamListenerCaptor.getValue(); ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose(); call.halfClose();
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
streamListener.closed(Status.OK, new Metadata()); streamListener.closed(Status.OK, new Metadata());
executor.runDueTasks(); executor.runDueTasks();
assertEquals(1, channel.channelStats.getCallsSucceeded()); assertEquals(1, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
} }
@Test @Test
@ -1716,12 +1718,12 @@ public class ManagedChannelImplTest {
call.start(mockCallListener, new Metadata()); call.start(mockCallListener, new Metadata());
call.cancel("msg", null); call.cancel("msg", null);
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
executor.runDueTasks(); executor.runDueTasks();
verify(mockCallListener).onClose(any(Status.class), any(Metadata.class)); verify(mockCallListener).onClose(any(Status.class), any(Metadata.class));
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(1, channel.channelStats.getCallsFailed()); assertEquals(1, getStats(channel).callsFailed);
} }
@Test @Test
@ -1730,12 +1732,12 @@ public class ManagedChannelImplTest {
OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority"); OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority");
ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT); ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
assertEquals(0, channel.channelStats.getCallsStarted()); assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata()); call.start(mockCallListener, new Metadata());
// only oob channel stats updated // only oob channel stats updated
assertEquals(1, oob1.channelStats.getCallsStarted()); assertEquals(1, getStats(oob1).callsStarted);
assertEquals(0, channel.channelStats.getCallsStarted()); assertEquals(0, getStats(channel).callsStarted);
assertEquals(executor.currentTimeMillis(), oob1.channelStats.getLastCallStartedMillis()); assertEquals(executor.currentTimeMillis(), getStats(oob1).lastCallStartedMillis);
} }
@Test @Test
@ -1765,15 +1767,15 @@ public class ManagedChannelImplTest {
// the actual test // the actual test
ClientStreamListener streamListener = streamListenerCaptor.getValue(); ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose(); call.halfClose();
assertEquals(0, oobChannel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(0, oobChannel.channelStats.getCallsFailed()); assertEquals(0, getStats(oobChannel).callsFailed);
streamListener.closed(Status.OK, new Metadata()); streamListener.closed(Status.OK, new Metadata());
callExecutor.runDueTasks(); callExecutor.runDueTasks();
// only oob channel stats updated // only oob channel stats updated
assertEquals(1, oobChannel.channelStats.getCallsSucceeded()); assertEquals(1, getStats(oobChannel).callsSucceeded);
assertEquals(0, oobChannel.channelStats.getCallsFailed()); assertEquals(0, getStats(oobChannel).callsFailed);
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
} }
@Test @Test
@ -1784,15 +1786,15 @@ public class ManagedChannelImplTest {
call.start(mockCallListener, new Metadata()); call.start(mockCallListener, new Metadata());
call.cancel("msg", null); call.cancel("msg", null);
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
oobExecutor.runDueTasks(); oobExecutor.runDueTasks();
// only oob channel stats updated // only oob channel stats updated
verify(mockCallListener).onClose(any(Status.class), any(Metadata.class)); verify(mockCallListener).onClose(any(Status.class), any(Metadata.class));
assertEquals(0, oob1.channelStats.getCallsSucceeded()); assertEquals(0, getStats(oob1).callsSucceeded);
assertEquals(1, oob1.channelStats.getCallsFailed()); assertEquals(1, getStats(oob1).callsFailed);
assertEquals(0, channel.channelStats.getCallsSucceeded()); assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, channel.channelStats.getCallsFailed()); assertEquals(0, getStats(channel).callsFailed);
} }
private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@ -1907,4 +1909,9 @@ public class ManagedChannelImplTest {
return "fake"; return "fake";
} }
} }
private static InternalChannelStats getStats(
InternalInstrumented<InternalChannelStats> instrumented) throws Exception {
return instrumented.getStats().get();
}
} }