core: add subchannel stats (#3967)

This commit is contained in:
zpencer 2018-02-09 10:50:11 -08:00 committed by GitHub
parent d2c7e33f3e
commit 35f0d15291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 581 additions and 119 deletions

View File

@ -89,6 +89,7 @@ public final class EquivalentAddressGroup {
@Override
public String toString() {
// TODO(zpencer): Summarize return value if addr is very large
return "[addrs=" + addrs + ", attrs=" + attrs + "]";
}

View File

@ -16,6 +16,8 @@
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.LoadBalancer;
import io.grpc.internal.Channelz.ChannelStats;
import javax.annotation.Nullable;
@ -24,9 +26,7 @@ import javax.annotation.Nullable;
* The base interface of the Subchannels returned by {@link
* io.grpc.LoadBalancer.Helper#createSubchannel}.
*/
abstract class AbstractSubchannel extends LoadBalancer.Subchannel
implements Instrumented<ChannelStats> {
private final LogId logId = LogId.allocate(getClass().getName());
abstract class AbstractSubchannel extends LoadBalancer.Subchannel {
/**
* Same as {@link InternalSubchannel#obtainActiveTransport}.
@ -34,8 +34,9 @@ abstract class AbstractSubchannel extends LoadBalancer.Subchannel
@Nullable
abstract ClientTransport obtainActiveTransport();
@Override
public LogId getLogId() {
return logId;
}
/**
* Same as {@link InternalSubchannel#getStats()}.
*/
@VisibleForTesting
abstract ListenableFuture<ChannelStats> getStats();
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import io.grpc.Attributes;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.Status;
import java.io.InputStream;
abstract class ForwardingClientStream implements ClientStream {
protected abstract ClientStream delegate();
@Override
public void request(int numMessages) {
delegate().request(numMessages);
}
@Override
public void writeMessage(InputStream message) {
delegate().writeMessage(message);
}
@Override
public void flush() {
delegate().flush();
}
@Override
public boolean isReady() {
return delegate().isReady();
}
@Override
public void setCompressor(Compressor compressor) {
delegate().setCompressor(compressor);
}
@Override
public void setMessageCompression(boolean enable) {
delegate().setMessageCompression(enable);
}
@Override
public void cancel(Status reason) {
delegate().cancel(reason);
}
@Override
public void halfClose() {
delegate().halfClose();
}
@Override
public void setAuthority(String authority) {
delegate().setAuthority(authority);
}
@Override
public void setFullStreamDecompression(boolean fullStreamDecompression) {
delegate().setFullStreamDecompression(fullStreamDecompression);
}
@Override
public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
delegate().setDecompressorRegistry(decompressorRegistry);
}
@Override
public void start(ClientStreamListener listener) {
delegate().start(listener);
}
@Override
public void setMaxInboundMessageSize(int maxSize) {
delegate().setMaxInboundMessageSize(maxSize);
}
@Override
public void setMaxOutboundMessageSize(int maxSize) {
delegate().setMaxOutboundMessageSize(maxSize);
}
@Override
public Attributes getAttributes() {
return delegate().getAttributes();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import io.grpc.Metadata;
import io.grpc.Status;
abstract class ForwardingClientStreamListener implements ClientStreamListener {
protected abstract ClientStreamListener delegate();
@Override
public void headersRead(Metadata headers) {
delegate().headersRead(headers);
}
@Override
public void closed(Status status, Metadata trailers) {
delegate().closed(status, trailers);
}
@Override
public void messagesAvailable(MessageProducer producer) {
delegate().messagesAvailable(producer);
}
@Override
public void onReady() {
delegate().onReady();
}
}

View File

@ -23,5 +23,9 @@ import com.google.common.util.concurrent.ListenableFuture;
* support instrumentation, then the future will return a {@code null}.
*/
public interface Instrumented<T> extends WithLogId {
/**
* Returns the stats object.
*/
ListenableFuture<T> getStats();
}

View File

@ -26,11 +26,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ForOverride;
import io.grpc.CallOptions;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.Channelz.ChannelStats;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@ -48,7 +54,7 @@ import javax.annotation.concurrent.ThreadSafe;
* Transports for a single {@link SocketAddress}.
*/
@ThreadSafe
final class InternalSubchannel implements WithLogId {
final class InternalSubchannel implements Instrumented<ChannelStats> {
private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName());
private final LogId logId = LogId.allocate(getClass().getName());
@ -58,6 +64,7 @@ final class InternalSubchannel implements WithLogId {
private final Callback callback;
private final ClientTransportFactory transportFactory;
private final ScheduledExecutorService scheduledExecutor;
private final CallTracer callsTracer;
// File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock.
private final Object lock = new Object();
@ -152,7 +159,7 @@ final class InternalSubchannel implements WithLogId {
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Supplier<Stopwatch> stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback,
ProxyDetector proxyDetector) {
ProxyDetector proxyDetector, CallTracer callsTracer) {
this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup");
this.authority = authority;
this.userAgent = userAgent;
@ -163,6 +170,7 @@ final class InternalSubchannel implements WithLogId {
this.channelExecutor = channelExecutor;
this.callback = callback;
this.proxyDetector = proxyDetector;
this.callsTracer = callsTracer;
}
/**
@ -206,7 +214,9 @@ final class InternalSubchannel implements WithLogId {
ProxyParameters proxy = proxyDetector.proxyFor(address);
ConnectionClientTransport transport =
transportFactory.newClientTransport(address, authority, userAgent, proxy);
new CallTracingTransport(
transportFactory.newClientTransport(address, authority, userAgent, proxy),
callsTracer);
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] Created {1} for {2}",
new Object[] {logId, transport.getLogId(), address});
@ -436,6 +446,19 @@ final class InternalSubchannel implements WithLogId {
return logId;
}
@Override
public ListenableFuture<ChannelStats> getStats() {
SettableFuture<ChannelStats> ret = SettableFuture.create();
ChannelStats.Builder builder = new ChannelStats.Builder();
synchronized (lock) {
builder.setTarget(addressGroup.toString()).setState(getState());
}
callsTracer.updateBuilder(builder);
ret.set(builder.build());
return ret;
}
@VisibleForTesting
ConnectivityState getState() {
try {
@ -582,4 +605,49 @@ final class InternalSubchannel implements WithLogId {
@ForOverride
void onNotInUse(InternalSubchannel is) { }
}
@VisibleForTesting
static final class CallTracingTransport extends ForwardingConnectionClientTransport {
private final ConnectionClientTransport delegate;
private final CallTracer callTracer;
private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
this.delegate = delegate;
this.callTracer = callTracer;
}
@Override
protected ConnectionClientTransport delegate() {
return delegate;
}
@Override
public ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
final ClientStream streamDelegate = super.newStream(method, headers, callOptions);
return new ForwardingClientStream() {
@Override
protected ClientStream delegate() {
return streamDelegate;
}
@Override
public void start(final ClientStreamListener listener) {
callTracer.reportCallStarted();
super.start(new ForwardingClientStreamListener() {
@Override
protected ClientStreamListener delegate() {
return listener;
}
@Override
public void closed(Status status, Metadata trailers) {
callTracer.reportCallEnded(status.isOk());
super.closed(status, trailers);
}
});
}
};
}
}
}

View File

@ -867,7 +867,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
checkNotNull(attrs, "attrs");
// TODO(ejona): can we be even stricter? Like loadBalancer == null?
checkState(!terminated, "Channel is terminated");
final SubchannelImpl subchannel = new SubchannelImpl(attrs, callTracerFactory.create());
final SubchannelImpl subchannel = new SubchannelImpl(attrs);
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority(), userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
@ -898,7 +898,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
inUseStateAggregator.updateObjectInUse(is, false);
}
},
proxyDetector);
proxyDetector,
callTracerFactory.create());
subchannel.subchannel = internalSubchannel;
logger.log(Level.FINE, "[{0}] {1} created for {2}",
new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroup});
@ -961,7 +962,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
checkState(!terminated, "Channel is terminated");
final OobChannel oobChannel = new OobChannel(
authority, oobExecutorPool, transportFactory.getScheduledExecutorService(),
channelExecutor, callTracerFactory);
channelExecutor, callTracerFactory.create());
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
@ -980,7 +981,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
oobChannel.handleSubchannelStateChange(newState);
}
},
proxyDetector);
proxyDetector,
callTracerFactory.create());
oobChannel.setSubchannel(internalSubchannel);
runSerialized(new Runnable() {
@Override
@ -1111,16 +1113,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
InternalSubchannel subchannel;
final Object shutdownLock = new Object();
final Attributes attrs;
final CallTracer subchannelCallTracer;
@GuardedBy("shutdownLock")
boolean shutdownRequested;
@GuardedBy("shutdownLock")
ScheduledFuture<?> delayedShutdownTask;
SubchannelImpl(Attributes attrs, CallTracer subchannelCallTracer) {
SubchannelImpl(Attributes attrs) {
this.attrs = checkNotNull(attrs, "attrs");
this.subchannelCallTracer = subchannelCallTracer;
}
@Override
@ -1128,6 +1128,11 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
return subchannel.obtainActiveTransport();
}
@Override
ListenableFuture<ChannelStats> getStats() {
return subchannel.getStats();
}
@Override
public void shutdown() {
synchronized (shutdownLock) {
@ -1188,16 +1193,6 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
public String toString() {
return subchannel.getLogId().toString();
}
@Override
public ListenableFuture<ChannelStats> getStats() {
SettableFuture<ChannelStats> ret = SettableFuture.create();
ChannelStats.Builder builder = new Channelz.ChannelStats.Builder();
subchannelCallTracer.updateBuilder(builder);
builder.setTarget(target).setState(subchannel.getState());
ret.set(builder.build());
return ret;
}
}
@VisibleForTesting

View File

@ -67,7 +67,6 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
private final CountDownLatch terminatedLatch = new CountDownLatch(1);
private volatile boolean shutdown;
private final CallTracer channelCallsTracer;
private final CallTracer subchannelCallsTracer;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
@ -88,7 +87,7 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
OobChannel(
String authority, ObjectPool<? extends Executor> executorPool,
ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor,
CallTracer.Factory callTracerFactory) {
CallTracer callsTracer) {
this.authority = checkNotNull(authority, "authority");
this.executorPool = checkNotNull(executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
@ -116,8 +115,7 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
// Don't care
}
});
this.channelCallsTracer = callTracerFactory.create();
this.subchannelCallsTracer = callTracerFactory.create();
this.channelCallsTracer = callsTracer;
}
// Must be called only once, right after the OobChannel is created.
@ -135,6 +133,11 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
return subchannel.obtainActiveTransport();
}
@Override
ListenableFuture<ChannelStats> getStats() {
return subchannel.getStats();
}
@Override
public void requestConnection() {
subchannel.obtainActiveTransport();
@ -149,16 +152,6 @@ final class OobChannel extends ManagedChannel implements Instrumented<ChannelSta
public Attributes getAttributes() {
return Attributes.EMPTY;
}
@Override
public ListenableFuture<ChannelStats> getStats() {
SettableFuture<ChannelStats> ret = SettableFuture.create();
ChannelStats.Builder builder = new ChannelStats.Builder();
subchannelCallsTracer.updateBuilder(builder);
builder.setTarget(authority).setState(subchannel.getState());
ret.set(builder.build());
return ret;
}
};
subchannelPicker = new SubchannelPicker() {

View File

@ -0,0 +1,74 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.grpc.ForwardingTestUtil;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.StreamListener.MessageProducer;
import java.lang.reflect.Method;
import java.util.Collections;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ForwardingClientStreamListenerTest {
private ClientStreamListener mock = mock(ClientStreamListener.class);
private ForwardingClientStreamListener forward = new ForwardingClientStreamListener() {
@Override
protected ClientStreamListener delegate() {
return mock;
}
};
@Test
public void allMethodsForwarded() throws Exception {
ForwardingTestUtil.testMethodsForwarded(
ClientStreamListener.class,
mock,
forward,
Collections.<Method>emptyList());
}
@Test
public void headersReadTest() {
Metadata headers = new Metadata();
forward.headersRead(headers);
verify(mock).headersRead(same(headers));
}
@Test
public void closedTest() {
Status status = Status.UNKNOWN;
Metadata trailers = new Metadata();
forward.closed(status, trailers);
verify(mock).closed(same(status), same(trailers));
}
@Test
public void messagesAvailableTest() {
MessageProducer producer = mock(MessageProducer.class);
forward.messagesAvailable(producer);
verify(mock).messagesAvailable(same(producer));
}
}

View File

@ -0,0 +1,156 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import io.grpc.DecompressorRegistry;
import io.grpc.ForwardingTestUtil;
import io.grpc.Status;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.util.Collections;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ForwardingClientStreamTest {
private ClientStream mock = mock(ClientStream.class);
private ForwardingClientStream forward = new ForwardingClientStream() {
@Override
protected ClientStream delegate() {
return mock;
}
};
@Test
public void allMethodsForwarded() throws Exception {
ForwardingTestUtil.testMethodsForwarded(
ClientStream.class,
mock,
forward,
Collections.<Method>emptyList());
}
@Test
public void requestTest() {
forward.request(1234);
verify(mock).request(1234);
}
@Test
public void writeMessageTest() {
InputStream is = mock(InputStream.class);
forward.writeMessage(is);
verify(mock).writeMessage(same(is));
}
@Test
public void isReadyTest() {
when(mock.isReady()).thenReturn(true);
assertEquals(true, forward.isReady());
}
@Test
public void setCompressorTest() {
Compressor compressor = mock(Compressor.class);
forward.setCompressor(compressor);
verify(mock).setCompressor(same(compressor));
}
@Test
public void setMessageCompressionTest() {
forward.setMessageCompression(true);
verify(mock).setMessageCompression(true);
}
@Test
public void cancelTest() {
Status reason = Status.UNKNOWN;
forward.cancel(reason);
verify(mock).cancel(same(reason));
}
@Test
public void setAuthorityTest() {
String authority = "authority";
forward.setAuthority(authority);
verify(mock).setAuthority(authority);
}
@Test
public void setFullStreamDecompressionTest() {
forward.setFullStreamDecompression(true);
verify(mock).setFullStreamDecompression(true);
}
@Test
public void setDecompressorRegistryTest() {
DecompressorRegistry decompressor =
DecompressorRegistry.emptyInstance().with(new Decompressor() {
@Override
public String getMessageEncoding() {
return "some-encoding";
}
@Override
public InputStream decompress(InputStream is) throws IOException {
return is;
}
}, true);
forward.setDecompressorRegistry(decompressor);
verify(mock).setDecompressorRegistry(same(decompressor));
}
@Test
public void startTest() {
ClientStreamListener listener = mock(ClientStreamListener.class);
forward.start(listener);
verify(mock).start(same(listener));
}
@Test
public void setMaxInboundMessageSizeTest() {
int size = 4567;
forward.setMaxInboundMessageSize(size);
verify(mock).setMaxInboundMessageSize(size);;
}
@Test
public void setMaxOutboundMessageSizeTest() {
int size = 6789;
forward.setMaxOutboundMessageSize(size);
verify(mock).setMaxOutboundMessageSize(size);
}
@Test
public void getAttributesTest() {
Attributes attr = Attributes.newBuilder().build();
when(mock.getAttributes()).thenReturn(attr);
assertSame(attr, forward.getAttributes());
}
}

View File

@ -41,6 +41,7 @@ import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.Status;
import io.grpc.internal.InternalSubchannel.CallTracingTransport;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -205,7 +206,9 @@ public class InternalSubchannelTest {
transports.peek().listener.transportReady();
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState());
assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport());
assertSame(
transports.peek().transport,
((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
// Close the READY transport, will enter IDLE state.
assertNoCallbackInvoke();
@ -323,7 +326,9 @@ public class InternalSubchannelTest {
assertExactCallbackInvokes("onStateChange:READY");
assertEquals(READY, internalSubchannel.getState());
assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport());
assertSame(
transports.peek().transport,
((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate());
// Then close it.
assertNoCallbackInvoke();
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
@ -952,7 +957,7 @@ public class InternalSubchannelTest {
internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT,
mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(),
fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback,
proxyDetector);
proxyDetector, CallTracer.getDefaultFactory().create());
}
private void assertNoCallbackInvoke() {

View File

@ -1671,7 +1671,7 @@ public class ManagedChannelImplTest {
assertEquals(target, getStats(channel).target);
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
assertEquals(target, getStats((AbstractSubchannel) subchannel).target);
assertEquals(addressGroup.toString(), getStats((AbstractSubchannel) subchannel).target);
}
@Test
@ -1719,128 +1719,142 @@ public class ManagedChannelImplTest {
}
@Test
public void channelStat_callEndSuccess() throws Exception {
// set up
Metadata headers = new Metadata();
ClientStream mockStream = mock(ClientStream.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
// Start a call with a call executor
CallOptions options =
CallOptions.DEFAULT.withExecutor(executor.getScheduledExecutorService());
ClientCall<String, Integer> call = channel.newCall(method, options);
call.start(mockCallListener, headers);
// Make the transport available
Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel));
helper.updateBalancingState(READY, mockPicker);
executor.runDueTasks();
verify(mockStream).start(streamListenerCaptor.capture());
// end set up
// the actual test
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
streamListener.closed(Status.OK, new Metadata());
executor.runDueTasks();
assertEquals(1, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
public void channelsAndSubChannels_instrumented_success() throws Exception {
channelsAndSubchannels_instrumented0(true);
}
@Test
public void channelStat_callEndFail() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
call.cancel("msg", null);
public void channelsAndSubChannels_instrumented_fail() throws Exception {
channelsAndSubchannels_instrumented0(false);
}
private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
// Channel stat bumped when ClientCall.start() called
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
assertEquals(1, getStats(channel).callsStarted);
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
AbstractSubchannel subchannel =
(AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory));
// subchannel stat bumped when call gets assigned to it
assertEquals(0, getStats(subchannel).callsStarted);
helper.updateBalancingState(READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockStream).start(streamListenerCaptor.capture());
assertEquals(1, getStats(subchannel).callsStarted);
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
// closing stream listener affects subchannel stats immediately
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
if (success) {
assertEquals(1, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
} else {
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(1, getStats(subchannel).callsFailed);
}
// channel stats bumped when the ClientCall.Listener is notified
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
executor.runDueTasks();
verify(mockCallListener).onClose(any(Status.class), any(Metadata.class));
if (success) {
assertEquals(1, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
} else {
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(1, getStats(channel).callsFailed);
}
@Test
public void channelStat_callStarted_oob() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority");
ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
// only oob channel stats updated
assertEquals(1, getStats(oob1).callsStarted);
assertEquals(0, getStats(channel).callsStarted);
assertEquals(executor.currentTimeMillis(), getStats(oob1).lastCallStartedMillis);
}
@Test
public void channelStat_callEndSuccess_oob() throws Exception {
public void channelsAndSubchannels_oob_instrumented_success() throws Exception {
channelsAndSubchannels_oob_instrumented0(true);
}
@Test
public void channelsAndSubchannels_oob_instrumented_fail() throws Exception {
channelsAndSubchannels_oob_instrumented0(false);
}
private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception {
// set up
ClientStream mockStream = mock(ClientStream.class);
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority");
AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel();
FakeClock callExecutor = new FakeClock();
CallOptions options =
CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService());
ClientCall<String, Integer> call = oobChannel.newCall(method, options);
Metadata headers = new Metadata();
// Channel stat bumped when ClientCall.start() called
assertEquals(0, getStats(oobChannel).callsStarted);
call.start(mockCallListener, headers);
assertEquals(1, getStats(oobChannel).callsStarted);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class)))
.thenReturn(mockStream);
// subchannel stat bumped when call gets assigned to it
assertEquals(0, getStats(oobSubchannel).callsStarted);
transportListener.transportReady();
callExecutor.runDueTasks();
verify(mockStream).start(streamListenerCaptor.capture());
// end set up
assertEquals(1, getStats(oobSubchannel).callsStarted);
// the actual test
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
streamListener.closed(Status.OK, new Metadata());
callExecutor.runDueTasks();
// only oob channel stats updated
assertEquals(1, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
// closing stream listener affects subchannel stats immediately
assertEquals(0, getStats(oobSubchannel).callsSucceeded);
assertEquals(0, getStats(oobSubchannel).callsFailed);
streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
if (success) {
assertEquals(1, getStats(oobSubchannel).callsSucceeded);
assertEquals(0, getStats(oobSubchannel).callsFailed);
} else {
assertEquals(0, getStats(oobSubchannel).callsSucceeded);
assertEquals(1, getStats(oobSubchannel).callsFailed);
}
@Test
public void channelStat_callEndFail_oob() throws Exception {
createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR);
OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority");
ClientCall<String, Integer> call = oob1.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
call.cancel("msg", null);
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
oobExecutor.runDueTasks();
// only oob channel stats updated
verify(mockCallListener).onClose(any(Status.class), any(Metadata.class));
assertEquals(0, getStats(oob1).callsSucceeded);
assertEquals(1, getStats(oob1).callsFailed);
// channel stats bumped when the ClientCall.Listener is notified
assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
callExecutor.runDueTasks();
if (success) {
assertEquals(1, getStats(oobChannel).callsSucceeded);
assertEquals(0, getStats(oobChannel).callsFailed);
} else {
assertEquals(0, getStats(oobChannel).callsSucceeded);
assertEquals(1, getStats(oobChannel).callsFailed);
}
// oob channel is separate from the original channel
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
}
@ -2057,6 +2071,10 @@ public class ManagedChannelImplTest {
}
}
private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception {
return subchannel.getStats().get();
}
private static ChannelStats getStats(
Instrumented<ChannelStats> instrumented) throws Exception {
return instrumented.getStats().get();