diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index e4f0431101..7cdb69e97e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -223,6 +223,7 @@ public abstract class AbstractServerStream extends AbstractStream @Override public final void onStreamAllocated() { super.onStreamAllocated(); + getTransportTracer().reportRemoteStreamStarted(); } @Override diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 03e446eab6..22866f0714 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -235,7 +235,6 @@ public abstract class AbstractStream implements Stream { allocated = true; } notifyIfReady(); - transportTracer.reportStreamStarted(); } /** diff --git a/core/src/main/java/io/grpc/internal/Channelz.java b/core/src/main/java/io/grpc/internal/Channelz.java index d6321ee122..98441ab373 100644 --- a/core/src/main/java/io/grpc/internal/Channelz.java +++ b/core/src/main/java/io/grpc/internal/Channelz.java @@ -295,7 +295,8 @@ public final class Channelz { @Immutable public static final class TransportStats { public final long streamsStarted; - public final long lastStreamCreatedTimeNanos; + public final long lastLocalStreamCreatedTimeNanos; + public final long lastRemoteStreamCreatedTimeNanos; public final long streamsSucceeded; public final long streamsFailed; public final long messagesSent; @@ -311,7 +312,8 @@ public final class Channelz { */ public TransportStats( long streamsStarted, - long lastStreamCreatedTimeNanos, + long lastLocalStreamCreatedTimeNanos, + long lastRemoteStreamCreatedTimeNanos, long streamsSucceeded, long streamsFailed, long messagesSent, @@ -322,7 +324,8 @@ public final class Channelz { long localFlowControlWindow, long remoteFlowControlWindow) { this.streamsStarted = streamsStarted; - this.lastStreamCreatedTimeNanos = lastStreamCreatedTimeNanos; + this.lastLocalStreamCreatedTimeNanos = lastLocalStreamCreatedTimeNanos; + this.lastRemoteStreamCreatedTimeNanos = lastRemoteStreamCreatedTimeNanos; this.streamsSucceeded = streamsSucceeded; this.streamsFailed = streamsFailed; this.messagesSent = messagesSent; diff --git a/core/src/main/java/io/grpc/internal/TransportTracer.java b/core/src/main/java/io/grpc/internal/TransportTracer.java index 5cfda7b8bc..c9391e4228 100644 --- a/core/src/main/java/io/grpc/internal/TransportTracer.java +++ b/core/src/main/java/io/grpc/internal/TransportTracer.java @@ -36,7 +36,8 @@ public final class TransportTracer { private final TimeProvider timeProvider; private long streamsStarted; - private long lastStreamCreatedTimeNanos; + private long lastLocalStreamCreatedTimeNanos; + private long lastRemoteStreamCreatedTimeNanos; private long streamsSucceeded; private long streamsFailed; private long keepAlivesSent; @@ -66,7 +67,8 @@ public final class TransportTracer { flowControlWindowReader == null ? -1 : flowControlWindowReader.read().remoteBytes; return new TransportStats( streamsStarted, - lastStreamCreatedTimeNanos, + lastLocalStreamCreatedTimeNanos, + lastRemoteStreamCreatedTimeNanos, streamsSucceeded, streamsFailed, messagesSent, @@ -79,12 +81,19 @@ public final class TransportTracer { } /** - * Called by the transport to report a stream has started. For clients, this happens when a header - * is sent. For servers, this happens when a header is received. + * Called by the client to report a stream has started. */ - public void reportStreamStarted() { + public void reportLocalStreamStarted() { streamsStarted++; - lastStreamCreatedTimeNanos = currentTimeNanos(); + lastLocalStreamCreatedTimeNanos = currentTimeNanos(); + } + + /** + * Called by the server to report a stream has started. + */ + public void reportRemoteStreamStarted() { + streamsStarted++; + lastRemoteStreamCreatedTimeNanos = currentTimeNanos(); } /** diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 6bc6cdc9df..0e3ecfc874 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -248,6 +248,7 @@ class NettyClientStream extends AbstractClientStream { // Now that the stream has actually been initialized, call the listener's onReady callback if // appropriate. onStreamAllocated(); + getTransportTracer().reportLocalStreamStarted(); } /** diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 740fd6a724..60bdcf7334 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -242,6 +242,7 @@ class OkHttpClientStream extends AbstractClientStream { @Override protected void onStreamAllocated() { super.onStreamAllocated(); + getTransportTracer().reportLocalStreamStarted(); } @GuardedBy("lock") diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 487df35cbc..e165a46940 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -1409,10 +1409,10 @@ public abstract class AbstractTransportTest { { TransportStats serverBefore = getTransportStats(serverTransportListener.transport); assertEquals(0, serverBefore.streamsStarted); - assertEquals(0, serverBefore.lastStreamCreatedTimeNanos); + assertEquals(0, serverBefore.lastRemoteStreamCreatedTimeNanos); TransportStats clientBefore = getTransportStats(client); assertEquals(0, clientBefore.streamsStarted); - assertEquals(0, clientBefore.lastStreamCreatedTimeNanos); + assertEquals(0, clientBefore.lastRemoteStreamCreatedTimeNanos); ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); @@ -1422,14 +1422,14 @@ public abstract class AbstractTransportTest { TransportStats serverAfter = getTransportStats(serverTransportListener.transport); assertEquals(1, serverAfter.streamsStarted); - serverFirstTimestampNanos = serverAfter.lastStreamCreatedTimeNanos; + serverFirstTimestampNanos = serverAfter.lastRemoteStreamCreatedTimeNanos; assertEquals( currentTimeMillis(), - TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos)); + TimeUnit.NANOSECONDS.toMillis(serverAfter.lastRemoteStreamCreatedTimeNanos)); TransportStats clientAfter = getTransportStats(client); assertEquals(1, clientAfter.streamsStarted); - clientFirstTimestampNanos = clientAfter.lastStreamCreatedTimeNanos; + clientFirstTimestampNanos = clientAfter.lastLocalStreamCreatedTimeNanos; assertEquals( currentTimeMillis(), TimeUnit.NANOSECONDS.toMillis(clientFirstTimestampNanos)); @@ -1458,18 +1458,18 @@ public abstract class AbstractTransportTest { assertEquals(2, serverAfter.streamsStarted); assertEquals( TimeUnit.MILLISECONDS.toNanos(elapsedMillis), - serverAfter.lastStreamCreatedTimeNanos - serverFirstTimestampNanos); + serverAfter.lastRemoteStreamCreatedTimeNanos - serverFirstTimestampNanos); long serverSecondTimestamp = - TimeUnit.NANOSECONDS.toMillis(serverAfter.lastStreamCreatedTimeNanos); + TimeUnit.NANOSECONDS.toMillis(serverAfter.lastRemoteStreamCreatedTimeNanos); assertEquals(currentTimeMillis(), serverSecondTimestamp); TransportStats clientAfter = getTransportStats(client); assertEquals(2, clientAfter.streamsStarted); assertEquals( TimeUnit.MILLISECONDS.toNanos(elapsedMillis), - clientAfter.lastStreamCreatedTimeNanos - clientFirstTimestampNanos); + clientAfter.lastLocalStreamCreatedTimeNanos - clientFirstTimestampNanos); long clientSecondTimestamp = - TimeUnit.NANOSECONDS.toMillis(clientAfter.lastStreamCreatedTimeNanos); + TimeUnit.NANOSECONDS.toMillis(clientAfter.lastLocalStreamCreatedTimeNanos); assertEquals(currentTimeMillis(), clientSecondTimestamp); ServerStream serverStream = serverStreamCreation.stream;