diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java index 91c15a3e97..555d63468e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java @@ -86,6 +86,8 @@ public abstract class AbstractClientStream extends AbstractStream void cancel(Status status); } + @Nullable // okhttp does not support transportTracer yet + private final TransportTracer transportTracer; private final Framer framer; private boolean useGet; private Metadata headers; @@ -104,6 +106,7 @@ public abstract class AbstractClientStream extends AbstractStream Metadata headers, boolean useGet) { Preconditions.checkNotNull(headers, "headers"); + this.transportTracer = transportTracer; this.useGet = useGet; if (!useGet) { framer = new MessageFramer(this, bufferAllocator, statsTraceCtx); @@ -189,6 +192,10 @@ public abstract class AbstractClientStream extends AbstractStream return super.isReady() && !cancelled; } + protected TransportTracer getTransportTracer() { + return transportTracer; + } + /** This should only called from the transport thread. */ protected abstract static class TransportState extends AbstractStream.TransportState { /** Whether listener.closed() has been called. */ @@ -383,6 +390,9 @@ public abstract class AbstractClientStream extends AbstractStream listenerClosed = true; statsTraceCtx.streamClosed(status); listener().closed(status, trailers); + if (getTransportTracer() != null) { + getTransportTracer().reportStreamClosed(status.isOk()); + } } } } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index b984337824..e4f0431101 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -191,7 +191,6 @@ public abstract class AbstractServerStream extends AbstractStream private boolean listenerClosed; private ServerStreamListener listener; private final StatsTraceContext statsTraceCtx; - private final TransportTracer transportTracer; private boolean endOfStream = false; private boolean deframerClosed = false; @@ -210,7 +209,6 @@ public abstract class AbstractServerStream extends AbstractStream statsTraceCtx, Preconditions.checkNotNull(transportTracer, "transportTracer")); this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); - this.transportTracer = transportTracer; } /** @@ -225,7 +223,6 @@ public abstract class AbstractServerStream extends AbstractStream @Override public final void onStreamAllocated() { super.onStreamAllocated(); - transportTracer.reportStreamStarted(); } @Override @@ -336,9 +333,9 @@ public abstract class AbstractServerStream extends AbstractStream if (!listenerClosed) { if (!newStatus.isOk()) { statsTraceCtx.streamClosed(newStatus); - transportTracer.reportStreamClosed(false); + getTransportTracer().reportStreamClosed(false); } else { - transportTracer.reportStreamClosed(closedStatus.isOk()); + getTransportTracer().reportStreamClosed(closedStatus.isOk()); } listenerClosed = true; onStreamDeallocated(); diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index f46f5e891c..8bb9df5fa8 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -108,6 +108,8 @@ public abstract class AbstractStream implements Stream { private Deframer deframer; private final Object onReadyLock = new Object(); private final StatsTraceContext statsTraceCtx; + @Nullable // okhttp transports don't trace yet + private final TransportTracer transportTracer; /** * The number of bytes currently queued, waiting to be sent. When this falls below @@ -131,8 +133,9 @@ public abstract class AbstractStream implements Stream { protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, - @Nullable TransportTracer transportTracer) { // nullable: client streams don't trace yet + @Nullable TransportTracer transportTracer) { // nullable: okhttp transports don't trace yet this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); + this.transportTracer = transportTracer; deframer = new MessageDeframer( this, Codec.Identity.NONE, @@ -231,6 +234,9 @@ public abstract class AbstractStream implements Stream { allocated = true; } notifyIfReady(); + if (transportTracer != null) { + transportTracer.reportStreamStarted(); + } } /** @@ -281,6 +287,10 @@ public abstract class AbstractStream implements Stream { } } + protected TransportTracer getTransportTracer() { + return transportTracer; + } + private void notifyIfReady() { boolean doNotify; synchronized (onReadyLock) { diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 7869455cd0..2889e78fe3 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -25,7 +25,6 @@ import com.google.common.base.Splitter; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; @@ -681,11 +680,10 @@ public final class GrpcUtil { transport.ping(callback, executor); } + @Nullable @Override public Future getTransportStats() { - SettableFuture ret = SettableFuture.create(); - ret.set(null); - return ret; + return transport.getTransportStats(); } }; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index aea4643ed5..f680cd8e9c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -31,6 +31,7 @@ import io.grpc.internal.ClientTransport.PingCallback; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -52,6 +53,7 @@ import io.netty.handler.codec.http2.Http2ConnectionAdapter; import io.netty.handler.codec.http2.Http2ConnectionDecoder; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2FlowController; import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameReader; @@ -97,6 +99,7 @@ class NettyClientHandler extends AbstractNettyHandler { private final KeepAliveManager keepAliveManager; // Returns new unstarted stopwatches private final Supplier stopwatchFactory; + private final TransportTracer transportTracer; private WriteQueue clientWriteQueue; private Http2Ping ping; private Attributes attributes = Attributes.EMPTY; @@ -107,7 +110,8 @@ class NettyClientHandler extends AbstractNettyHandler { int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, - Runnable tooManyPingsRunnable) { + Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); @@ -128,12 +132,13 @@ class NettyClientHandler extends AbstractNettyHandler { flowControlWindow, maxHeaderListSize, stopwatchFactory, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); } @VisibleForTesting static NettyClientHandler newHandler( - Http2Connection connection, + final Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, @@ -141,7 +146,8 @@ class NettyClientHandler extends AbstractNettyHandler { int flowControlWindow, int maxHeaderListSize, Supplier stopwatchFactory, - Runnable tooManyPingsRunnable) { + Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { Preconditions.checkNotNull(connection, "connection"); Preconditions.checkNotNull(frameReader, "frameReader"); Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); @@ -164,6 +170,18 @@ class NettyClientHandler extends AbstractNettyHandler { Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); + transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { + final Http2FlowController local = connection.local().flowController(); + final Http2FlowController remote = connection.remote().flowController(); + + @Override + public TransportTracer.FlowControlWindows read() { + return new TransportTracer.FlowControlWindows( + local.windowSize(connection.connectionStream()), + remote.windowSize(connection.connectionStream())); + } + }); + Http2Settings settings = new Http2Settings(); settings.pushEnabled(false); settings.initialWindowSize(flowControlWindow); @@ -177,7 +195,8 @@ class NettyClientHandler extends AbstractNettyHandler { lifecycleManager, keepAliveManager, stopwatchFactory, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); } private NettyClientHandler( @@ -187,11 +206,13 @@ class NettyClientHandler extends AbstractNettyHandler { ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, Supplier stopwatchFactory, - final Runnable tooManyPingsRunnable) { + final Runnable tooManyPingsRunnable, + TransportTracer transportTracer) { super(decoder, encoder, settings); this.lifecycleManager = lifecycleManager; this.keepAliveManager = keepAliveManager; this.stopwatchFactory = stopwatchFactory; + this.transportTracer = Preconditions.checkNotNull(transportTracer); // Set the frame listener on the decoder. decoder().frameListener(new FrameListener()); @@ -550,7 +571,9 @@ class NettyClientHandler extends AbstractNettyHandler { promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { + if (future.isSuccess()) { + transportTracer.reportKeepAliveSent(); + } else { Throwable cause = future.cause(); if (cause instanceof ClosedChannelException) { cause = lifecycleManager.getShutdownThrowable(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 76eba44142..b97270ad7b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -32,6 +32,7 @@ import io.grpc.Status; import io.grpc.internal.AbstractClientStream; import io.grpc.internal.Http2ClientStreamTransportState; import io.grpc.internal.StatsTraceContext; +import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -61,12 +62,19 @@ class NettyClientStream extends AbstractClientStream { private final AsciiString userAgent; NettyClientStream( - TransportState state, MethodDescriptor method, Metadata headers, - Channel channel, AsciiString authority, AsciiString scheme, AsciiString userAgent, - StatsTraceContext statsTraceCtx) { - super(new NettyWritableBufferAllocator(channel.alloc()), + TransportState state, + MethodDescriptor method, + Metadata headers, + Channel channel, + AsciiString authority, + AsciiString scheme, + AsciiString userAgent, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super( + new NettyWritableBufferAllocator(channel.alloc()), statsTraceCtx, - null, + transportTracer, headers, useGet(method)); this.state = checkNotNull(state, "transportState"); @@ -149,7 +157,7 @@ class NettyClientStream extends AbstractClientStream { @Override public void writeFrame( - WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { + WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); final int numBytes = bytebuf.readableBytes(); @@ -167,6 +175,7 @@ class NettyClientStream extends AbstractClientStream { // Remove the bytes from outbound flow control, optionally notifying // the client that they can send more bytes. transportState().onSentBytes(numBytes); + NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages); } } }), flush); @@ -205,9 +214,13 @@ class NettyClientStream extends AbstractClientStream { private int id; private Http2Stream http2Stream; - public TransportState(NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, - StatsTraceContext statsTraceCtx) { - super(maxMessageSize, statsTraceCtx, /*transportTracer=*/null); + public TransportState( + NettyClientHandler handler, + EventLoop eventLoop, + int maxMessageSize, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer) { + super(maxMessageSize, statsTraceCtx, transportTracer); this.handler = checkNotNull(handler, "handler"); this.eventLoop = checkNotNull(eventLoop, "eventLoop"); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0d84a92aff..b977a0f70f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -50,6 +50,7 @@ import io.netty.util.AsciiString; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Future; import javax.annotation.Nullable; @@ -84,6 +85,8 @@ class NettyClientTransport implements ConnectionClientTransport { private Status statusExplainingWhyTheChannelIsNull; /** Since not thread-safe, may only be used from event loop. */ private ClientTransportLifecycleManager lifecycleManager; + /** Since not thread-safe, may only be used from event loop. */ + private final TransportTracer transportTracer = new TransportTracer(); NettyClientTransport( SocketAddress address, Class channelType, @@ -146,15 +149,25 @@ class NettyClientTransport implements ConnectionClientTransport { } StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers); return new NettyClientStream( - new NettyClientStream.TransportState(handler, channel.eventLoop(), maxMessageSize, - statsTraceCtx) { + new NettyClientStream.TransportState( + handler, + channel.eventLoop(), + maxMessageSize, + statsTraceCtx, + transportTracer) { @Override protected Status statusFromFailedFuture(ChannelFuture f) { return NettyClientTransport.this.statusFromFailedFuture(f); } }, - method, headers, channel, authority, negotiationHandler.scheme(), userAgent, - statsTraceCtx); + method, + headers, + channel, + authority, + negotiationHandler.scheme(), + userAgent, + statsTraceCtx, + transportTracer); } @SuppressWarnings("unchecked") @@ -175,7 +188,8 @@ class NettyClientTransport implements ConnectionClientTransport { flowControlWindow, maxHeaderListSize, GrpcUtil.STOPWATCH_SUPPLIER, - tooManyPingsRunnable); + tooManyPingsRunnable, + transportTracer); NettyHandlerSettings.setAutoWindow(handler); negotiationHandler = negotiator.newHandler(handler); @@ -295,9 +309,20 @@ class NettyClientTransport implements ConnectionClientTransport { @Override public Future getTransportStats() { - SettableFuture ret = SettableFuture.create(); - ret.set(null); - return ret; + if (channel.eventLoop().inEventLoop()) { + // This is necessary, otherwise we will block forever if we get the future from inside + // the event loop. + SettableFuture result = SettableFuture.create(); + result.set(transportTracer.getStats()); + return result; + } + return channel.eventLoop().submit( + new Callable() { + @Override + public TransportTracer.Stats call() throws Exception { + return transportTracer.getStats(); + } + }); } @VisibleForTesting diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index b3f97bdba2..a3097ad402 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -26,7 +26,6 @@ import static io.grpc.netty.Utils.STATUS_OK; import static io.grpc.netty.Utils.TE_HEADER; import static io.grpc.netty.Utils.TE_TRAILERS; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -59,6 +58,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; +import io.grpc.internal.TransportTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -104,7 +104,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase streamListenerMessageQueue = new LinkedList(); + @Override + protected void manualSetUp() throws Exception { + setUp(); + } + /** * Set up for test. */ @@ -156,8 +160,11 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(ByteBuf.class); verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), @@ -534,7 +550,9 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase captor = ArgumentCaptor.forClass(ByteBuf.class); verifyWrite().writePing(eq(ctx()), eq(false), captor.capture(), any(ChannelPromise.class)); ByteBuf payload = captor.getValue(); @@ -575,6 +597,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class); @@ -430,9 +445,15 @@ public class NettyClientStreamTest extends NettyStreamTestBase { */ protected void manualSetUp() throws Exception {} + protected final TransportTracer transportTracer = new TransportTracer(); + protected int flowControlWindow = DEFAULT_WINDOW_SIZE; + private final FakeClock fakeClock = new FakeClock(); FakeClock fakeClock() { @@ -451,4 +456,53 @@ public abstract class NettyHandlerTestBase { assertEquals(maxWindow, localFlowController.initialWindowSize(connectionStream)); } + @Test + public void transportTracer_windowSizeDefault() throws Exception { + manualSetUp(); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowSize() throws Exception { + flowControlWindow = 1024 * 1024; + manualSetUp(); + TransportTracer.Stats stats = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, stats.remoteFlowControlWindow); + assertEquals(flowControlWindow, stats.localFlowControlWindow); + } + + @Test + public void transportTracer_windowUpdate_remote() throws Exception { + manualSetUp(); + TransportTracer.Stats before = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.localFlowControlWindow); + + ByteBuf serializedSettings = windowUpdate(0, 1000); + channelRead(serializedSettings); + TransportTracer.Stats after = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE + 1000, + after.remoteFlowControlWindow); + assertEquals(flowControlWindow, after.localFlowControlWindow); + } + + @Test + public void transportTracer_windowUpdate_local() throws Exception { + manualSetUp(); + TransportTracer.Stats before = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, before.remoteFlowControlWindow); + assertEquals(flowControlWindow, before.localFlowControlWindow); + + // If the window size is below a certain threshold, netty will wait to apply the update. + // Use a large increment to be sure that it exceeds the threshold. + connection().local().flowController().incrementWindowSize( + connection().connectionStream(), 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE); + + TransportTracer.Stats after = transportTracer.getStats(); + assertEquals(Http2CodecUtil.DEFAULT_WINDOW_SIZE, after.remoteFlowControlWindow); + assertEquals(flowControlWindow + 8 * Http2CodecUtil.DEFAULT_WINDOW_SIZE, + connection().local().flowController().windowSize(connection().connectionStream())); + } } diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index cd8ee55e67..acf2710dbc 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -29,7 +29,6 @@ import static io.grpc.netty.Utils.HTTP_METHOD; import static io.grpc.netty.Utils.TE_HEADER; import static io.grpc.netty.Utils.TE_TRAILERS; import static io.netty.buffer.Unpooled.directBuffer; -import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -66,7 +65,6 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.StreamListener; -import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; import io.netty.buffer.ByteBuf; @@ -126,7 +124,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase streamListenerMessageQueue = new LinkedList(); - private int flowControlWindow = DEFAULT_WINDOW_SIZE; private int maxConcurrentStreams = Integer.MAX_VALUE; private int maxHeaderListSize = Integer.MAX_VALUE; private boolean permitKeepAliveWithoutCalls = true; @@ -136,7 +133,6 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase firstTimestamp); - long secondTimestamp = TimeUnit.NANOSECONDS.toMillis(after.lastStreamCreatedTimeNanos); - assertThat(System.currentTimeMillis() - secondTimestamp).isAtMost(50L); + TransportTracer.Stats serverAfter = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(2, serverAfter.streamsStarted); + assertTrue(serverAfter.lastStreamCreatedTimeNanos > serverFirstTimestampNanos); + long serverSecondTimestamp = + TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos); + assertThat(System.currentTimeMillis() - serverSecondTimestamp).isAtMost(50L); + + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(2, clientAfter.streamsStarted); + assertTrue(clientAfter.lastStreamCreatedTimeNanos > clientFirstTimestampNanos); + long clientSecondTimestamp = + TimeUnit.NANOSECONDS.toMillis(clientAfter.lastStreamCreatedTimeNanos); + assertThat(System.currentTimeMillis() - clientSecondTimestamp).isAtMost(50L); ServerStream serverStream = serverStreamCreation.stream; serverStream.close(Status.OK, new Metadata()); @@ -1435,7 +1465,7 @@ public abstract class AbstractTransportTest { public void transportTracer_server_streamEnded_ok() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1448,24 +1478,35 @@ public abstract class AbstractTransportTest { return; } - TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); - assertEquals(0, before.streamsSucceeded); + TransportTracer.Stats serverBefore = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, serverBefore.streamsSucceeded); + assertEquals(0, serverBefore.streamsFailed); + TransportTracer.Stats clientBefore = client.getTransportStats().get(); + assertEquals(0, clientBefore.streamsSucceeded); + assertEquals(0, clientBefore.streamsFailed); + clientStream.halfClose(); serverStream.close(Status.OK, new Metadata()); - // Block until the close actually happened before verifying stats - serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + // do not validate stats until close() has been called on client + assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); - assertEquals(1, after.streamsSucceeded); - assertEquals(0, after.streamsFailed); + TransportTracer.Stats serverAfter = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, serverAfter.streamsSucceeded); + assertEquals(0, serverAfter.streamsFailed); + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(1, clientAfter.streamsSucceeded); + assertEquals(0, clientAfter.streamsFailed); } @Test public void transportTracer_server_streamEnded_nonOk() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1478,16 +1519,29 @@ public abstract class AbstractTransportTest { return; } - TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); - assertEquals(0, before.streamsFailed); + TransportTracer.Stats serverBefore = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, serverBefore.streamsFailed); + assertEquals(0, serverBefore.streamsSucceeded); + TransportTracer.Stats clientBefore = client.getTransportStats().get(); + assertEquals(0, clientBefore.streamsFailed); + assertEquals(0, clientBefore.streamsSucceeded); serverStream.close(Status.UNKNOWN, new Metadata()); - // Block until the close actually happened before verifying stats - serverStreamCreation.listener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + // do not validate stats until close() has been called on client + assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); - assertEquals(1, after.streamsFailed); - assertEquals(0, after.streamsSucceeded); + + TransportTracer.Stats serverAfter = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, serverAfter.streamsFailed); + assertEquals(0, serverAfter.streamsSucceeded); + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(1, clientAfter.streamsFailed); + assertEquals(0, clientAfter.streamsSucceeded); + + client.shutdown(Status.UNAVAILABLE); } @Test @@ -1510,10 +1564,9 @@ public abstract class AbstractTransportTest { serverTransportListener.transport.getTransportStats().get(); assertEquals(0, serverBefore.streamsFailed); assertEquals(0, serverBefore.streamsSucceeded); - // TODO(zpencer): uncomment when integrated with client transport - // TransportTracer.Stats clientBefore = client.getTransportStats().get(); - // assertEquals(0, clientBefore.streamsFailed); - // assertEquals(0, clientBefore.streamsSucceeded); + TransportTracer.Stats clientBefore = client.getTransportStats().get(); + assertEquals(0, clientBefore.streamsFailed); + assertEquals(0, clientBefore.streamsSucceeded); clientStream.cancel(Status.UNKNOWN); // do not validate stats until close() has been called on server @@ -1523,17 +1576,16 @@ public abstract class AbstractTransportTest { serverTransportListener.transport.getTransportStats().get(); assertEquals(1, serverAfter.streamsFailed); assertEquals(0, serverAfter.streamsSucceeded); - // TODO(zpencer): uncomment when integrated with client transport - // TransportTracer.Stats clientAfter = client.getTransportStats().get(); - // assertEquals(1, clientAfter.streamsFailed); - // assertEquals(0, clientAfter.streamsSucceeded); + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(1, clientAfter.streamsFailed); + assertEquals(0, clientAfter.streamsSucceeded); } @Test - public void transportTracer_receive_msg() throws Exception { + public void transportTracer_server_receive_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1547,9 +1599,13 @@ public abstract class AbstractTransportTest { return; } - TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); - assertEquals(0, before.messagesReceived); - assertEquals(0, before.lastMessageReceivedTimeNanos); + TransportTracer.Stats serverBefore = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, serverBefore.messagesReceived); + assertEquals(0, serverBefore.lastMessageReceivedTimeNanos); + TransportTracer.Stats clientBefore = client.getTransportStats().get(); + assertEquals(0, clientBefore.messagesSent); + assertEquals(0, clientBefore.lastMessageSentTimeNanos); serverStream.request(1); clientStream.writeMessage(methodDescriptor.streamRequest("request")); @@ -1557,19 +1613,26 @@ public abstract class AbstractTransportTest { clientStream.halfClose(); verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); - TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); - assertEquals(1, after.messagesReceived); - long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageReceivedTimeNanos); - assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L); + TransportTracer.Stats serverAfter = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, serverAfter.messagesReceived); + long serverTimestamp = + TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageReceivedTimeNanos); + assertThat(System.currentTimeMillis() - serverTimestamp).isAtMost(50L); + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(1, clientAfter.messagesSent); + long clientTimestamp = + TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageSentTimeNanos); + assertThat(System.currentTimeMillis() - clientTimestamp).isAtMost(50L); serverStream.close(Status.OK, new Metadata()); } @Test - public void transportTracer_send_msg() throws Exception { + public void transportTracer_server_send_msg() throws Exception { server.start(serverListener); client = newClientTransport(server); - runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class))); + runIfNotNull(client.start(mockClientTransportListener)); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); @@ -1582,9 +1645,13 @@ public abstract class AbstractTransportTest { return; } - TransportTracer.Stats before = serverTransportListener.transport.getTransportStats().get(); - assertEquals(0, before.messagesSent); - assertEquals(0, before.lastMessageSentTimeNanos); + TransportTracer.Stats serverBefore = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(0, serverBefore.messagesSent); + assertEquals(0, serverBefore.lastMessageSentTimeNanos); + TransportTracer.Stats clientBefore = client.getTransportStats().get(); + assertEquals(0, clientBefore.messagesReceived); + assertEquals(0, clientBefore.lastMessageReceivedTimeNanos); clientStream.request(1); serverStream.writeHeaders(new Metadata()); @@ -1592,11 +1659,16 @@ public abstract class AbstractTransportTest { serverStream.flush(); verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); - TransportTracer.Stats after = serverTransportListener.transport.getTransportStats().get(); - assertEquals(1, after.messagesSent); - long timestamp = TimeUnit.NANOSECONDS.toMillis(after.lastMessageSentTimeNanos); - assertThat(System.currentTimeMillis() - timestamp).isAtMost(50L); - + TransportTracer.Stats serverAfter = + serverTransportListener.transport.getTransportStats().get(); + assertEquals(1, serverAfter.messagesSent); + long serverTimestmap = TimeUnit.NANOSECONDS.toMillis(serverAfter.lastMessageSentTimeNanos); + assertThat(System.currentTimeMillis() - serverTimestmap).isAtMost(50L); + TransportTracer.Stats clientAfter = client.getTransportStats().get(); + assertEquals(1, clientAfter.messagesReceived); + long clientTimestmap = + TimeUnit.NANOSECONDS.toMillis(clientAfter.lastMessageReceivedTimeNanos); + assertThat(System.currentTimeMillis() - clientTimestmap).isAtMost(50L); serverStream.close(Status.OK, new Metadata()); }