diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 61fa3bbf89..e1769e91e0 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -40,6 +40,7 @@ java_library( visibility = ["//:__subpackages__"], deps = [ ":core", + ":perfmark", "//context", "@com_google_android_annotations//jar", "@com_google_code_findbugs_jsr305//jar", @@ -70,3 +71,18 @@ java_library( "@com_google_j2objc_j2objc_annotations//jar", ], ) + +java_library( + name = "perfmark", + srcs = glob( + [ + "src/main/java/io/grpc/perfmark/*.java", + ], + exclude = ["src/main/java/io/grpc/perfmark/package-info.java"], + ), + visibility = ["//:__subpackages__"], + deps = [ + "@com_google_code_findbugs_jsr305//jar", + "@com_google_errorprone_error_prone_annotations//jar", + ], +) diff --git a/core/src/main/java/io/grpc/ServerCall.java b/core/src/main/java/io/grpc/ServerCall.java index 6d4eb9141c..d91f751cb5 100644 --- a/core/src/main/java/io/grpc/ServerCall.java +++ b/core/src/main/java/io/grpc/ServerCall.java @@ -99,7 +99,7 @@ public abstract class ServerCall { * *

Servers use this mechanism to provide back-pressure to the client for flow-control. * - *

This method is safe to call from multiple threads without external synchronizaton. + *

This method is safe to call from multiple threads without external synchronization. * * @param numMessages the requested number of messages to be delivered to the listener. */ diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 6f1bd12805..1cab55ad30 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -46,6 +46,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.CancellationException; @@ -67,6 +69,7 @@ final class ClientCallImpl extends ClientCall { = "gzip".getBytes(Charset.forName("US-ASCII")); private final MethodDescriptor method; + private final PerfTag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; @@ -92,6 +95,8 @@ final class ClientCallImpl extends ClientCall { CallTracer channelCallsTracer, boolean retryEnabled) { this.method = method; + // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. + this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName()); // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. // See https://github.com/grpc/grpc-java/issues/368 @@ -177,7 +182,17 @@ final class ClientCallImpl extends ClientCall { } @Override - public void start(final Listener observer, Metadata headers) { + public void start(Listener observer, Metadata headers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.start"); + try { + startInternal(observer, headers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void startInternal(final Listener observer, Metadata headers) { checkState(stream == null, "Already started"); checkState(!cancelCalled, "call was cancelled"); checkNotNull(observer, "observer"); @@ -371,6 +386,16 @@ final class ClientCallImpl extends ClientCall { @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.cancel"); + try { + cancelInternal(message, cause); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void cancelInternal(@Nullable String message, @Nullable Throwable cause) { if (message == null && cause == null) { cause = new CancellationException("Cancelled without a message or cause"); log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause); @@ -401,6 +426,16 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.halfClose"); + try { + halfCloseInternal(); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void halfCloseInternal() { checkState(stream != null, "Not started"); checkState(!cancelCalled, "call was cancelled"); checkState(!halfCloseCalled, "call already half-closed"); @@ -410,6 +445,16 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.sendMessage"); + try { + sendMessageInternal(message); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendMessageInternal(ReqT message) { checkState(stream != null, "Not started"); checkState(!cancelCalled, "call was cancelled"); checkState(!halfCloseCalled, "call was half-closed"); @@ -467,6 +512,7 @@ final class ClientCallImpl extends ClientCall { private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; private boolean closed; + private final long listenerScopeId = PerfTag.allocateNumericId(); public ClientStreamListenerImpl(Listener observer) { this.observer = checkNotNull(observer, "observer"); @@ -474,23 +520,27 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { - class HeadersRead extends ContextRunnable { + final class HeadersRead extends ContextRunnable { HeadersRead() { super(context); } @Override public final void runInContext() { + if (closed) { + return; + } + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.headersRead"); try { - if (closed) { - return; - } observer.onHeaders(headers); } catch (Throwable t) { Status status = Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } @@ -500,7 +550,7 @@ final class ClientCallImpl extends ClientCall { @Override public void messagesAvailable(final MessageProducer producer) { - class MessagesAvailable extends ContextRunnable { + final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super(context); } @@ -511,9 +561,13 @@ final class ClientCallImpl extends ClientCall { GrpcUtil.closeQuietly(producer); return; } - - InputStream message; + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + tag, + "ClientCall.messagesAvailable"); try { + InputStream message; while ((message = producer.next()) != null) { try { observer.onMessage(method.parseResponse(message)); @@ -529,6 +583,8 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read message."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } @@ -570,7 +626,7 @@ final class ClientCallImpl extends ClientCall { } final Status savedStatus = status; final Metadata savedTrailers = trailers; - class StreamClosed extends ContextRunnable { + final class StreamClosed extends ContextRunnable { StreamClosed() { super(context); } @@ -581,7 +637,13 @@ final class ClientCallImpl extends ClientCall { // We intentionally don't keep the status or metadata from the server. return; } - close(savedStatus, savedTrailers); + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.closed"); + try { + close(savedStatus, savedTrailers); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } } @@ -590,13 +652,15 @@ final class ClientCallImpl extends ClientCall { @Override public void onReady() { - class StreamOnReady extends ContextRunnable { + final class StreamOnReady extends ContextRunnable { StreamOnReady() { super(context); } @Override public final void runInContext() { + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.onReady"); try { observer.onReady(); } catch (Throwable t) { @@ -604,6 +668,8 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 0fc52ec34f..4f418ec7c7 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -36,6 +36,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,6 +53,7 @@ final class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; + private final PerfTag tag; private final Context.CancellableContext context; private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; @@ -70,6 +73,8 @@ final class ServerCallImpl extends ServerCall { CallTracer serverCallTracer) { this.stream = stream; this.method = method; + // TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall. + this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName()); this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; @@ -85,6 +90,16 @@ final class ServerCallImpl extends ServerCall { @Override public void sendHeaders(Metadata headers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendHeaders"); + try { + sendHeadersInternal(headers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendHeadersInternal(Metadata headers) { checkState(!sendHeadersCalled, "sendHeaders has already been called"); checkState(!closeCalled, "call is closed"); @@ -125,6 +140,16 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendMessage"); + try { + sendMessageInternal(message); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendMessageInternal(RespT message) { checkState(sendHeadersCalled, "sendHeaders has not been called"); checkState(!closeCalled, "call is closed"); @@ -169,6 +194,16 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.close"); + try { + closeInternal(status, trailers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void closeInternal(Status status, Metadata trailers) { checkState(!closeCalled, "call already closed"); try { closeCalled = true; @@ -228,6 +263,7 @@ final class ServerCallImpl extends ServerCall { private final ServerCallImpl call; private final ServerCall.Listener listener; private final Context.CancellableContext context; + private final long listenerScopeId = PerfTag.allocateNumericId(); public ServerStreamListenerImpl( ServerCallImpl call, ServerCall.Listener listener, @@ -256,6 +292,11 @@ final class ServerCallImpl extends ServerCall { return; } + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.messagesAvailable"); InputStream message; try { while ((message = producer.next()) != null) { @@ -271,6 +312,8 @@ final class ServerCallImpl extends ServerCall { GrpcUtil.closeQuietly(producer); MoreThrowables.throwIfUnchecked(t); throw new RuntimeException(t); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } @@ -280,22 +323,42 @@ final class ServerCallImpl extends ServerCall { return; } - listener.onHalfClose(); + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.halfClosed"); + + try { + listener.onHalfClose(); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } @Override public void closed(Status status) { + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.closed"); try { - if (status.isOk()) { - listener.onComplete(); - } else { - call.cancelled = true; - listener.onCancel(); + try { + if (status.isOk()) { + listener.onComplete(); + } else { + call.cancelled = true; + listener.onCancel(); + } + } finally { + // Cancel context after delivering RPC closure notification to allow the application to + // clean up and update any state based on whether onComplete or onCancel was called. + context.cancel(null); + } } finally { - // Cancel context after delivering RPC closure notification to allow the application to - // clean up and update any state based on whether onComplete or onCancel was called. - context.cancel(null); + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } @@ -304,7 +367,16 @@ final class ServerCallImpl extends ServerCall { if (call.cancelled) { return; } - listener.onReady(); + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.closed"); + try { + listener.onReady(); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index d08abf7b52..4e4c3e1437 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -596,6 +596,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerStream stream, Metadata headers, Context.CancellableContext context) { + ServerCallImpl call = new ServerCallImpl<>( stream, methodDef.getMethodDescriptor(),