From 35f0d15291caaa5f21cd986429a17e7f986179b0 Mon Sep 17 00:00:00 2001 From: zpencer Date: Fri, 9 Feb 2018 10:50:11 -0800 Subject: [PATCH] core: add subchannel stats (#3967) --- .../java/io/grpc/EquivalentAddressGroup.java | 1 + .../io/grpc/internal/AbstractSubchannel.java | 15 +- .../grpc/internal/ForwardingClientStream.java | 102 +++++++++++ .../ForwardingClientStreamListener.java | 45 +++++ .../java/io/grpc/internal/Instrumented.java | 4 + .../io/grpc/internal/InternalSubchannel.java | 74 +++++++- .../io/grpc/internal/ManagedChannelImpl.java | 29 ++- .../java/io/grpc/internal/OobChannel.java | 21 +-- .../ForwardingClientStreamListenerTest.java | 74 ++++++++ .../internal/ForwardingClientStreamTest.java | 156 ++++++++++++++++ .../grpc/internal/InternalSubchannelTest.java | 11 +- .../grpc/internal/ManagedChannelImplTest.java | 168 ++++++++++-------- 12 files changed, 581 insertions(+), 119 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/ForwardingClientStream.java create mode 100644 core/src/main/java/io/grpc/internal/ForwardingClientStreamListener.java create mode 100644 core/src/test/java/io/grpc/internal/ForwardingClientStreamListenerTest.java create mode 100644 core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java diff --git a/core/src/main/java/io/grpc/EquivalentAddressGroup.java b/core/src/main/java/io/grpc/EquivalentAddressGroup.java index 6d570ae567..60f24e31da 100644 --- a/core/src/main/java/io/grpc/EquivalentAddressGroup.java +++ b/core/src/main/java/io/grpc/EquivalentAddressGroup.java @@ -89,6 +89,7 @@ public final class EquivalentAddressGroup { @Override public String toString() { + // TODO(zpencer): Summarize return value if addr is very large return "[addrs=" + addrs + ", attrs=" + attrs + "]"; } diff --git a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java index 57109d69bb..39c2444480 100644 --- a/core/src/main/java/io/grpc/internal/AbstractSubchannel.java +++ b/core/src/main/java/io/grpc/internal/AbstractSubchannel.java @@ -16,6 +16,8 @@ package io.grpc.internal; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; import io.grpc.LoadBalancer; import io.grpc.internal.Channelz.ChannelStats; import javax.annotation.Nullable; @@ -24,9 +26,7 @@ import javax.annotation.Nullable; * The base interface of the Subchannels returned by {@link * io.grpc.LoadBalancer.Helper#createSubchannel}. */ -abstract class AbstractSubchannel extends LoadBalancer.Subchannel - implements Instrumented { - private final LogId logId = LogId.allocate(getClass().getName()); +abstract class AbstractSubchannel extends LoadBalancer.Subchannel { /** * Same as {@link InternalSubchannel#obtainActiveTransport}. @@ -34,8 +34,9 @@ abstract class AbstractSubchannel extends LoadBalancer.Subchannel @Nullable abstract ClientTransport obtainActiveTransport(); - @Override - public LogId getLogId() { - return logId; - } + /** + * Same as {@link InternalSubchannel#getStats()}. + */ + @VisibleForTesting + abstract ListenableFuture getStats(); } diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStream.java b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java new file mode 100644 index 0000000000..8ff72faba2 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStream.java @@ -0,0 +1,102 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import io.grpc.Attributes; +import io.grpc.Compressor; +import io.grpc.DecompressorRegistry; +import io.grpc.Status; +import java.io.InputStream; + +abstract class ForwardingClientStream implements ClientStream { + protected abstract ClientStream delegate(); + + @Override + public void request(int numMessages) { + delegate().request(numMessages); + } + + @Override + public void writeMessage(InputStream message) { + delegate().writeMessage(message); + } + + @Override + public void flush() { + delegate().flush(); + } + + @Override + public boolean isReady() { + return delegate().isReady(); + } + + @Override + public void setCompressor(Compressor compressor) { + delegate().setCompressor(compressor); + } + + @Override + public void setMessageCompression(boolean enable) { + delegate().setMessageCompression(enable); + } + + @Override + public void cancel(Status reason) { + delegate().cancel(reason); + } + + @Override + public void halfClose() { + delegate().halfClose(); + } + + @Override + public void setAuthority(String authority) { + delegate().setAuthority(authority); + } + + @Override + public void setFullStreamDecompression(boolean fullStreamDecompression) { + delegate().setFullStreamDecompression(fullStreamDecompression); + } + + @Override + public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { + delegate().setDecompressorRegistry(decompressorRegistry); + } + + @Override + public void start(ClientStreamListener listener) { + delegate().start(listener); + } + + @Override + public void setMaxInboundMessageSize(int maxSize) { + delegate().setMaxInboundMessageSize(maxSize); + } + + @Override + public void setMaxOutboundMessageSize(int maxSize) { + delegate().setMaxOutboundMessageSize(maxSize); + } + + @Override + public Attributes getAttributes() { + return delegate().getAttributes(); + } +} diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamListener.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamListener.java new file mode 100644 index 0000000000..e371aa20f9 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamListener.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import io.grpc.Metadata; +import io.grpc.Status; + +abstract class ForwardingClientStreamListener implements ClientStreamListener { + + protected abstract ClientStreamListener delegate(); + + @Override + public void headersRead(Metadata headers) { + delegate().headersRead(headers); + } + + @Override + public void closed(Status status, Metadata trailers) { + delegate().closed(status, trailers); + } + + @Override + public void messagesAvailable(MessageProducer producer) { + delegate().messagesAvailable(producer); + } + + @Override + public void onReady() { + delegate().onReady(); + } +} diff --git a/core/src/main/java/io/grpc/internal/Instrumented.java b/core/src/main/java/io/grpc/internal/Instrumented.java index 70203a5b54..0a2001238d 100644 --- a/core/src/main/java/io/grpc/internal/Instrumented.java +++ b/core/src/main/java/io/grpc/internal/Instrumented.java @@ -23,5 +23,9 @@ import com.google.common.util.concurrent.ListenableFuture; * support instrumentation, then the future will return a {@code null}. */ public interface Instrumented extends WithLogId { + + /** + * Returns the stats object. + */ ListenableFuture getStats(); } diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java index 6f402ca84d..40a80d1f9d 100644 --- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java +++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java @@ -26,11 +26,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.errorprone.annotations.ForOverride; +import io.grpc.CallOptions; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.Status; +import io.grpc.internal.Channelz.ChannelStats; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; @@ -48,7 +54,7 @@ import javax.annotation.concurrent.ThreadSafe; * Transports for a single {@link SocketAddress}. */ @ThreadSafe -final class InternalSubchannel implements WithLogId { +final class InternalSubchannel implements Instrumented { private static final Logger log = Logger.getLogger(InternalSubchannel.class.getName()); private final LogId logId = LogId.allocate(getClass().getName()); @@ -58,6 +64,7 @@ final class InternalSubchannel implements WithLogId { private final Callback callback; private final ClientTransportFactory transportFactory; private final ScheduledExecutorService scheduledExecutor; + private final CallTracer callsTracer; // File-specific convention: methods without GuardedBy("lock") MUST NOT be called under the lock. private final Object lock = new Object(); @@ -152,7 +159,7 @@ final class InternalSubchannel implements WithLogId { BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, Supplier stopwatchSupplier, ChannelExecutor channelExecutor, Callback callback, - ProxyDetector proxyDetector) { + ProxyDetector proxyDetector, CallTracer callsTracer) { this.addressGroup = Preconditions.checkNotNull(addressGroup, "addressGroup"); this.authority = authority; this.userAgent = userAgent; @@ -163,6 +170,7 @@ final class InternalSubchannel implements WithLogId { this.channelExecutor = channelExecutor; this.callback = callback; this.proxyDetector = proxyDetector; + this.callsTracer = callsTracer; } /** @@ -206,7 +214,9 @@ final class InternalSubchannel implements WithLogId { ProxyParameters proxy = proxyDetector.proxyFor(address); ConnectionClientTransport transport = - transportFactory.newClientTransport(address, authority, userAgent, proxy); + new CallTracingTransport( + transportFactory.newClientTransport(address, authority, userAgent, proxy), + callsTracer); if (log.isLoggable(Level.FINE)) { log.log(Level.FINE, "[{0}] Created {1} for {2}", new Object[] {logId, transport.getLogId(), address}); @@ -436,6 +446,19 @@ final class InternalSubchannel implements WithLogId { return logId; } + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ChannelStats.Builder builder = new ChannelStats.Builder(); + synchronized (lock) { + builder.setTarget(addressGroup.toString()).setState(getState()); + } + callsTracer.updateBuilder(builder); + ret.set(builder.build()); + return ret; + } + @VisibleForTesting ConnectivityState getState() { try { @@ -582,4 +605,49 @@ final class InternalSubchannel implements WithLogId { @ForOverride void onNotInUse(InternalSubchannel is) { } } + + @VisibleForTesting + static final class CallTracingTransport extends ForwardingConnectionClientTransport { + private final ConnectionClientTransport delegate; + private final CallTracer callTracer; + + private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) { + this.delegate = delegate; + this.callTracer = callTracer; + } + + @Override + protected ConnectionClientTransport delegate() { + return delegate; + } + + @Override + public ClientStream newStream( + MethodDescriptor method, Metadata headers, CallOptions callOptions) { + final ClientStream streamDelegate = super.newStream(method, headers, callOptions); + return new ForwardingClientStream() { + @Override + protected ClientStream delegate() { + return streamDelegate; + } + + @Override + public void start(final ClientStreamListener listener) { + callTracer.reportCallStarted(); + super.start(new ForwardingClientStreamListener() { + @Override + protected ClientStreamListener delegate() { + return listener; + } + + @Override + public void closed(Status status, Metadata trailers) { + callTracer.reportCallEnded(status.isOk()); + super.closed(status, trailers); + } + }); + } + }; + } + } } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 4592313a04..62057732de 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -867,7 +867,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume checkNotNull(attrs, "attrs"); // TODO(ejona): can we be even stricter? Like loadBalancer == null? checkState(!terminated, "Channel is terminated"); - final SubchannelImpl subchannel = new SubchannelImpl(attrs, callTracerFactory.create()); + final SubchannelImpl subchannel = new SubchannelImpl(attrs); final InternalSubchannel internalSubchannel = new InternalSubchannel( addressGroup, authority(), userAgent, backoffPolicyProvider, transportFactory, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, @@ -898,7 +898,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume inUseStateAggregator.updateObjectInUse(is, false); } }, - proxyDetector); + proxyDetector, + callTracerFactory.create()); subchannel.subchannel = internalSubchannel; logger.log(Level.FINE, "[{0}] {1} created for {2}", new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroup}); @@ -961,7 +962,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume checkState(!terminated, "Channel is terminated"); final OobChannel oobChannel = new OobChannel( authority, oobExecutorPool, transportFactory.getScheduledExecutorService(), - channelExecutor, callTracerFactory); + channelExecutor, callTracerFactory.create()); final InternalSubchannel internalSubchannel = new InternalSubchannel( addressGroup, authority, userAgent, backoffPolicyProvider, transportFactory, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, @@ -980,7 +981,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume oobChannel.handleSubchannelStateChange(newState); } }, - proxyDetector); + proxyDetector, + callTracerFactory.create()); oobChannel.setSubchannel(internalSubchannel); runSerialized(new Runnable() { @Override @@ -1111,16 +1113,14 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume InternalSubchannel subchannel; final Object shutdownLock = new Object(); final Attributes attrs; - final CallTracer subchannelCallTracer; @GuardedBy("shutdownLock") boolean shutdownRequested; @GuardedBy("shutdownLock") ScheduledFuture delayedShutdownTask; - SubchannelImpl(Attributes attrs, CallTracer subchannelCallTracer) { + SubchannelImpl(Attributes attrs) { this.attrs = checkNotNull(attrs, "attrs"); - this.subchannelCallTracer = subchannelCallTracer; } @Override @@ -1128,6 +1128,11 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume return subchannel.obtainActiveTransport(); } + @Override + ListenableFuture getStats() { + return subchannel.getStats(); + } + @Override public void shutdown() { synchronized (shutdownLock) { @@ -1188,16 +1193,6 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume public String toString() { return subchannel.getLogId().toString(); } - - @Override - public ListenableFuture getStats() { - SettableFuture ret = SettableFuture.create(); - ChannelStats.Builder builder = new Channelz.ChannelStats.Builder(); - subchannelCallTracer.updateBuilder(builder); - builder.setTarget(target).setState(subchannel.getState()); - ret.set(builder.build()); - return ret; - } } @VisibleForTesting diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index b5ed154e06..1a9c3e5521 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -67,7 +67,6 @@ final class OobChannel extends ManagedChannel implements Instrumented executorPool, ScheduledExecutorService deadlineCancellationExecutor, ChannelExecutor channelExecutor, - CallTracer.Factory callTracerFactory) { + CallTracer callsTracer) { this.authority = checkNotNull(authority, "authority"); this.executorPool = checkNotNull(executorPool, "executorPool"); this.executor = checkNotNull(executorPool.getObject(), "executor"); @@ -116,8 +115,7 @@ final class OobChannel extends ManagedChannel implements Instrumented getStats() { + return subchannel.getStats(); + } + @Override public void requestConnection() { subchannel.obtainActiveTransport(); @@ -149,16 +152,6 @@ final class OobChannel extends ManagedChannel implements Instrumented getStats() { - SettableFuture ret = SettableFuture.create(); - ChannelStats.Builder builder = new ChannelStats.Builder(); - subchannelCallsTracer.updateBuilder(builder); - builder.setTarget(authority).setState(subchannel.getState()); - ret.set(builder.build()); - return ret; - } }; subchannelPicker = new SubchannelPicker() { diff --git a/core/src/test/java/io/grpc/internal/ForwardingClientStreamListenerTest.java b/core/src/test/java/io/grpc/internal/ForwardingClientStreamListenerTest.java new file mode 100644 index 0000000000..814d201164 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ForwardingClientStreamListenerTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.ForwardingTestUtil; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.internal.StreamListener.MessageProducer; +import java.lang.reflect.Method; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ForwardingClientStreamListenerTest { + private ClientStreamListener mock = mock(ClientStreamListener.class); + private ForwardingClientStreamListener forward = new ForwardingClientStreamListener() { + @Override + protected ClientStreamListener delegate() { + return mock; + } + }; + + @Test + public void allMethodsForwarded() throws Exception { + ForwardingTestUtil.testMethodsForwarded( + ClientStreamListener.class, + mock, + forward, + Collections.emptyList()); + } + + + @Test + public void headersReadTest() { + Metadata headers = new Metadata(); + forward.headersRead(headers); + verify(mock).headersRead(same(headers)); + } + + @Test + public void closedTest() { + Status status = Status.UNKNOWN; + Metadata trailers = new Metadata(); + forward.closed(status, trailers); + verify(mock).closed(same(status), same(trailers)); + } + + @Test + public void messagesAvailableTest() { + MessageProducer producer = mock(MessageProducer.class); + forward.messagesAvailable(producer); + verify(mock).messagesAvailable(same(producer)); + } +} diff --git a/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java b/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java new file mode 100644 index 0000000000..fca177d840 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ForwardingClientStreamTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.grpc.Attributes; +import io.grpc.Compressor; +import io.grpc.Decompressor; +import io.grpc.DecompressorRegistry; +import io.grpc.ForwardingTestUtil; +import io.grpc.Status; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ForwardingClientStreamTest { + private ClientStream mock = mock(ClientStream.class); + private ForwardingClientStream forward = new ForwardingClientStream() { + @Override + protected ClientStream delegate() { + return mock; + } + }; + + @Test + public void allMethodsForwarded() throws Exception { + ForwardingTestUtil.testMethodsForwarded( + ClientStream.class, + mock, + forward, + Collections.emptyList()); + } + + @Test + public void requestTest() { + forward.request(1234); + verify(mock).request(1234); + } + + @Test + public void writeMessageTest() { + InputStream is = mock(InputStream.class); + forward.writeMessage(is); + verify(mock).writeMessage(same(is)); + } + + @Test + public void isReadyTest() { + when(mock.isReady()).thenReturn(true); + assertEquals(true, forward.isReady()); + } + + @Test + public void setCompressorTest() { + Compressor compressor = mock(Compressor.class); + forward.setCompressor(compressor); + verify(mock).setCompressor(same(compressor)); + } + + @Test + public void setMessageCompressionTest() { + forward.setMessageCompression(true); + verify(mock).setMessageCompression(true); + } + + @Test + public void cancelTest() { + Status reason = Status.UNKNOWN; + forward.cancel(reason); + verify(mock).cancel(same(reason)); + } + + @Test + public void setAuthorityTest() { + String authority = "authority"; + forward.setAuthority(authority); + verify(mock).setAuthority(authority); + } + + @Test + public void setFullStreamDecompressionTest() { + forward.setFullStreamDecompression(true); + verify(mock).setFullStreamDecompression(true); + } + + @Test + public void setDecompressorRegistryTest() { + DecompressorRegistry decompressor = + DecompressorRegistry.emptyInstance().with(new Decompressor() { + @Override + public String getMessageEncoding() { + return "some-encoding"; + } + + @Override + public InputStream decompress(InputStream is) throws IOException { + return is; + } + }, true); + forward.setDecompressorRegistry(decompressor); + verify(mock).setDecompressorRegistry(same(decompressor)); + } + + @Test + public void startTest() { + ClientStreamListener listener = mock(ClientStreamListener.class); + forward.start(listener); + verify(mock).start(same(listener)); + } + + @Test + public void setMaxInboundMessageSizeTest() { + int size = 4567; + forward.setMaxInboundMessageSize(size); + verify(mock).setMaxInboundMessageSize(size);; + } + + @Test + public void setMaxOutboundMessageSizeTest() { + int size = 6789; + forward.setMaxOutboundMessageSize(size); + verify(mock).setMaxOutboundMessageSize(size); + } + + @Test + public void getAttributesTest() { + Attributes attr = Attributes.newBuilder().build(); + when(mock.getAttributes()).thenReturn(attr); + assertSame(attr, forward.getAttributes()); + } +} diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java index eefbb3468f..cabae43501 100644 --- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java +++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java @@ -41,6 +41,7 @@ import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; import io.grpc.Status; +import io.grpc.internal.InternalSubchannel.CallTracingTransport; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -205,7 +206,9 @@ public class InternalSubchannelTest { transports.peek().listener.transportReady(); assertExactCallbackInvokes("onStateChange:READY"); assertEquals(READY, internalSubchannel.getState()); - assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport()); + assertSame( + transports.peek().transport, + ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); // Close the READY transport, will enter IDLE state. assertNoCallbackInvoke(); @@ -323,7 +326,9 @@ public class InternalSubchannelTest { assertExactCallbackInvokes("onStateChange:READY"); assertEquals(READY, internalSubchannel.getState()); - assertSame(transports.peek().transport, internalSubchannel.obtainActiveTransport()); + assertSame( + transports.peek().transport, + ((CallTracingTransport) internalSubchannel.obtainActiveTransport()).delegate()); // Then close it. assertNoCallbackInvoke(); transports.poll().listener.transportShutdown(Status.UNAVAILABLE); @@ -952,7 +957,7 @@ public class InternalSubchannelTest { internalSubchannel = new InternalSubchannel(addressGroup, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider, mockTransportFactory, fakeClock.getScheduledExecutorService(), fakeClock.getStopwatchSupplier(), channelExecutor, mockInternalSubchannelCallback, - proxyDetector); + proxyDetector, CallTracer.getDefaultFactory().create()); } private void assertNoCallbackInvoke() { diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 5de808b4e1..a572932c29 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -1671,7 +1671,7 @@ public class ManagedChannelImplTest { assertEquals(target, getStats(channel).target); Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); - assertEquals(target, getStats((AbstractSubchannel) subchannel).target); + assertEquals(addressGroup.toString(), getStats((AbstractSubchannel) subchannel).target); } @Test @@ -1719,128 +1719,142 @@ public class ManagedChannelImplTest { } @Test - public void channelStat_callEndSuccess() throws Exception { - // set up - Metadata headers = new Metadata(); - ClientStream mockStream = mock(ClientStream.class); + public void channelsAndSubChannels_instrumented_success() throws Exception { + channelsAndSubchannels_instrumented0(true); + } + + @Test + public void channelsAndSubChannels_instrumented_fail() throws Exception { + channelsAndSubchannels_instrumented0(false); + } + + private void channelsAndSubchannels_instrumented0(boolean success) throws Exception { createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); - // Start a call with a call executor - CallOptions options = - CallOptions.DEFAULT.withExecutor(executor.getScheduledExecutorService()); - ClientCall call = channel.newCall(method, options); - call.start(mockCallListener, headers); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); - // Make the transport available - Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + // Channel stat bumped when ClientCall.start() called + assertEquals(0, getStats(channel).callsStarted); + call.start(mockCallListener, new Metadata()); + assertEquals(1, getStats(channel).callsStarted); + + ClientStream mockStream = mock(ClientStream.class); + ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class); + AbstractSubchannel subchannel = + (AbstractSubchannel) helper.createSubchannel(addressGroup, Attributes.EMPTY); subchannel.requestConnection(); MockClientTransportInfo transportInfo = transports.poll(); - ConnectionClientTransport mockTransport = transportInfo.transport; - ManagedClientTransport.Listener transportListener = transportInfo.listener; - when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) + transportInfo.listener.transportReady(); + ClientTransport mockTransport = transportInfo.transport; + when(mockTransport.newStream( + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) .thenReturn(mockStream); - transportListener.transportReady(); - when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) - .thenReturn(PickResult.withSubchannel(subchannel)); + when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn( + PickResult.withSubchannel(subchannel, factory)); + + // subchannel stat bumped when call gets assigned to it + assertEquals(0, getStats(subchannel).callsStarted); helper.updateBalancingState(READY, mockPicker); - - executor.runDueTasks(); + assertEquals(1, executor.runDueTasks()); verify(mockStream).start(streamListenerCaptor.capture()); - // end set up + assertEquals(1, getStats(subchannel).callsStarted); - // the actual test ClientStreamListener streamListener = streamListenerCaptor.getValue(); call.halfClose(); - assertEquals(0, getStats(channel).callsSucceeded); - assertEquals(0, getStats(channel).callsFailed); - streamListener.closed(Status.OK, new Metadata()); - executor.runDueTasks(); - assertEquals(1, getStats(channel).callsSucceeded); - assertEquals(0, getStats(channel).callsFailed); - } - @Test - public void channelStat_callEndFail() throws Exception { - createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); - ClientCall call = channel.newCall(method, CallOptions.DEFAULT); - call.start(mockCallListener, new Metadata()); - call.cancel("msg", null); + // closing stream listener affects subchannel stats immediately + assertEquals(0, getStats(subchannel).callsSucceeded); + assertEquals(0, getStats(subchannel).callsFailed); + streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); + if (success) { + assertEquals(1, getStats(subchannel).callsSucceeded); + assertEquals(0, getStats(subchannel).callsFailed); + } else { + assertEquals(0, getStats(subchannel).callsSucceeded); + assertEquals(1, getStats(subchannel).callsFailed); + } + // channel stats bumped when the ClientCall.Listener is notified assertEquals(0, getStats(channel).callsSucceeded); assertEquals(0, getStats(channel).callsFailed); executor.runDueTasks(); - verify(mockCallListener).onClose(any(Status.class), any(Metadata.class)); - assertEquals(0, getStats(channel).callsSucceeded); - assertEquals(1, getStats(channel).callsFailed); + if (success) { + assertEquals(1, getStats(channel).callsSucceeded); + assertEquals(0, getStats(channel).callsFailed); + } else { + assertEquals(0, getStats(channel).callsSucceeded); + assertEquals(1, getStats(channel).callsFailed); + } } @Test - public void channelStat_callStarted_oob() throws Exception { - createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); - OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority"); - ClientCall call = oob1.newCall(method, CallOptions.DEFAULT); - - assertEquals(0, getStats(channel).callsStarted); - call.start(mockCallListener, new Metadata()); - // only oob channel stats updated - assertEquals(1, getStats(oob1).callsStarted); - assertEquals(0, getStats(channel).callsStarted); - assertEquals(executor.currentTimeMillis(), getStats(oob1).lastCallStartedMillis); + public void channelsAndSubchannels_oob_instrumented_success() throws Exception { + channelsAndSubchannels_oob_instrumented0(true); } @Test - public void channelStat_callEndSuccess_oob() throws Exception { + public void channelsAndSubchannels_oob_instrumented_fail() throws Exception { + channelsAndSubchannels_oob_instrumented0(false); + } + + private void channelsAndSubchannels_oob_instrumented0(boolean success) throws Exception { // set up ClientStream mockStream = mock(ClientStream.class); createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobauthority"); + AbstractSubchannel oobSubchannel = (AbstractSubchannel) oobChannel.getSubchannel(); FakeClock callExecutor = new FakeClock(); CallOptions options = CallOptions.DEFAULT.withExecutor(callExecutor.getScheduledExecutorService()); ClientCall call = oobChannel.newCall(method, options); Metadata headers = new Metadata(); + + // Channel stat bumped when ClientCall.start() called + assertEquals(0, getStats(oobChannel).callsStarted); call.start(mockCallListener, headers); + assertEquals(1, getStats(oobChannel).callsStarted); MockClientTransportInfo transportInfo = transports.poll(); ConnectionClientTransport mockTransport = transportInfo.transport; ManagedClientTransport.Listener transportListener = transportInfo.listener; when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) .thenReturn(mockStream); + + // subchannel stat bumped when call gets assigned to it + assertEquals(0, getStats(oobSubchannel).callsStarted); transportListener.transportReady(); callExecutor.runDueTasks(); verify(mockStream).start(streamListenerCaptor.capture()); - // end set up + assertEquals(1, getStats(oobSubchannel).callsStarted); - // the actual test ClientStreamListener streamListener = streamListenerCaptor.getValue(); call.halfClose(); + + // closing stream listener affects subchannel stats immediately + assertEquals(0, getStats(oobSubchannel).callsSucceeded); + assertEquals(0, getStats(oobSubchannel).callsFailed); + streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata()); + if (success) { + assertEquals(1, getStats(oobSubchannel).callsSucceeded); + assertEquals(0, getStats(oobSubchannel).callsFailed); + } else { + assertEquals(0, getStats(oobSubchannel).callsSucceeded); + assertEquals(1, getStats(oobSubchannel).callsFailed); + } + + // channel stats bumped when the ClientCall.Listener is notified assertEquals(0, getStats(oobChannel).callsSucceeded); assertEquals(0, getStats(oobChannel).callsFailed); - streamListener.closed(Status.OK, new Metadata()); callExecutor.runDueTasks(); - // only oob channel stats updated - assertEquals(1, getStats(oobChannel).callsSucceeded); - assertEquals(0, getStats(oobChannel).callsFailed); - assertEquals(0, getStats(channel).callsSucceeded); - assertEquals(0, getStats(channel).callsFailed); - } - - @Test - public void channelStat_callEndFail_oob() throws Exception { - createChannel(new FakeNameResolverFactory(true), NO_INTERCEPTOR); - OobChannel oob1 = (OobChannel) helper.createOobChannel(addressGroup, "oob1authority"); - ClientCall call = oob1.newCall(method, CallOptions.DEFAULT); - call.start(mockCallListener, new Metadata()); - call.cancel("msg", null); - - assertEquals(0, getStats(channel).callsSucceeded); - assertEquals(0, getStats(channel).callsFailed); - oobExecutor.runDueTasks(); - // only oob channel stats updated - verify(mockCallListener).onClose(any(Status.class), any(Metadata.class)); - assertEquals(0, getStats(oob1).callsSucceeded); - assertEquals(1, getStats(oob1).callsFailed); + if (success) { + assertEquals(1, getStats(oobChannel).callsSucceeded); + assertEquals(0, getStats(oobChannel).callsFailed); + } else { + assertEquals(0, getStats(oobChannel).callsSucceeded); + assertEquals(1, getStats(oobChannel).callsFailed); + } + // oob channel is separate from the original channel assertEquals(0, getStats(channel).callsSucceeded); assertEquals(0, getStats(channel).callsFailed); } @@ -2057,6 +2071,10 @@ public class ManagedChannelImplTest { } } + private static ChannelStats getStats(AbstractSubchannel subchannel) throws Exception { + return subchannel.getStats().get(); + } + private static ChannelStats getStats( Instrumented instrumented) throws Exception { return instrumented.getStats().get();