From faf6ff9f983d39bcf8fa6e768463f8695b722a25 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 3 Apr 2019 18:16:22 -0700 Subject: [PATCH] core: add client and server call perfmark annotations This code is highly experimental, so we can change it at will later. This PR is to go ahead and try it out. Perfmark task calls are added to the client and server calls public APIs, recording when the calls begin and end. They use separate scope IDs (roughly these are Thread IDs) for the listener and the call due to no synchronization between them. However, they both use the same PerfTags, which allows them to be associated. In the future, we can plumb the tag down into the stream to include deeper information about whats going on in a call. --- core/BUILD.bazel | 16 ++++ core/src/main/java/io/grpc/ServerCall.java | 2 +- .../java/io/grpc/internal/ClientCallImpl.java | 88 +++++++++++++++--- .../java/io/grpc/internal/ServerCallImpl.java | 92 +++++++++++++++++-- .../java/io/grpc/internal/ServerImpl.java | 1 + 5 files changed, 177 insertions(+), 22 deletions(-) diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 61fa3bbf89..e1769e91e0 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -40,6 +40,7 @@ java_library( visibility = ["//:__subpackages__"], deps = [ ":core", + ":perfmark", "//context", "@com_google_android_annotations//jar", "@com_google_code_findbugs_jsr305//jar", @@ -70,3 +71,18 @@ java_library( "@com_google_j2objc_j2objc_annotations//jar", ], ) + +java_library( + name = "perfmark", + srcs = glob( + [ + "src/main/java/io/grpc/perfmark/*.java", + ], + exclude = ["src/main/java/io/grpc/perfmark/package-info.java"], + ), + visibility = ["//:__subpackages__"], + deps = [ + "@com_google_code_findbugs_jsr305//jar", + "@com_google_errorprone_error_prone_annotations//jar", + ], +) diff --git a/core/src/main/java/io/grpc/ServerCall.java b/core/src/main/java/io/grpc/ServerCall.java index 6d4eb9141c..d91f751cb5 100644 --- a/core/src/main/java/io/grpc/ServerCall.java +++ b/core/src/main/java/io/grpc/ServerCall.java @@ -99,7 +99,7 @@ public abstract class ServerCall { * *

Servers use this mechanism to provide back-pressure to the client for flow-control. * - *

This method is safe to call from multiple threads without external synchronizaton. + *

This method is safe to call from multiple threads without external synchronization. * * @param numMessages the requested number of messages to be delivered to the listener. */ diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 6f1bd12805..1cab55ad30 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -46,6 +46,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.CancellationException; @@ -67,6 +69,7 @@ final class ClientCallImpl extends ClientCall { = "gzip".getBytes(Charset.forName("US-ASCII")); private final MethodDescriptor method; + private final PerfTag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; @@ -92,6 +95,8 @@ final class ClientCallImpl extends ClientCall { CallTracer channelCallsTracer, boolean retryEnabled) { this.method = method; + // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. + this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName()); // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. // See https://github.com/grpc/grpc-java/issues/368 @@ -177,7 +182,17 @@ final class ClientCallImpl extends ClientCall { } @Override - public void start(final Listener observer, Metadata headers) { + public void start(Listener observer, Metadata headers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.start"); + try { + startInternal(observer, headers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void startInternal(final Listener observer, Metadata headers) { checkState(stream == null, "Already started"); checkState(!cancelCalled, "call was cancelled"); checkNotNull(observer, "observer"); @@ -371,6 +386,16 @@ final class ClientCallImpl extends ClientCall { @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.cancel"); + try { + cancelInternal(message, cause); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void cancelInternal(@Nullable String message, @Nullable Throwable cause) { if (message == null && cause == null) { cause = new CancellationException("Cancelled without a message or cause"); log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause); @@ -401,6 +426,16 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.halfClose"); + try { + halfCloseInternal(); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void halfCloseInternal() { checkState(stream != null, "Not started"); checkState(!cancelCalled, "call was cancelled"); checkState(!halfCloseCalled, "call already half-closed"); @@ -410,6 +445,16 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.sendMessage"); + try { + sendMessageInternal(message); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendMessageInternal(ReqT message) { checkState(stream != null, "Not started"); checkState(!cancelCalled, "call was cancelled"); checkState(!halfCloseCalled, "call was half-closed"); @@ -467,6 +512,7 @@ final class ClientCallImpl extends ClientCall { private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; private boolean closed; + private final long listenerScopeId = PerfTag.allocateNumericId(); public ClientStreamListenerImpl(Listener observer) { this.observer = checkNotNull(observer, "observer"); @@ -474,23 +520,27 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { - class HeadersRead extends ContextRunnable { + final class HeadersRead extends ContextRunnable { HeadersRead() { super(context); } @Override public final void runInContext() { + if (closed) { + return; + } + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.headersRead"); try { - if (closed) { - return; - } observer.onHeaders(headers); } catch (Throwable t) { Status status = Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } @@ -500,7 +550,7 @@ final class ClientCallImpl extends ClientCall { @Override public void messagesAvailable(final MessageProducer producer) { - class MessagesAvailable extends ContextRunnable { + final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super(context); } @@ -511,9 +561,13 @@ final class ClientCallImpl extends ClientCall { GrpcUtil.closeQuietly(producer); return; } - - InputStream message; + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + tag, + "ClientCall.messagesAvailable"); try { + InputStream message; while ((message = producer.next()) != null) { try { observer.onMessage(method.parseResponse(message)); @@ -529,6 +583,8 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read message."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } @@ -570,7 +626,7 @@ final class ClientCallImpl extends ClientCall { } final Status savedStatus = status; final Metadata savedTrailers = trailers; - class StreamClosed extends ContextRunnable { + final class StreamClosed extends ContextRunnable { StreamClosed() { super(context); } @@ -581,7 +637,13 @@ final class ClientCallImpl extends ClientCall { // We intentionally don't keep the status or metadata from the server. return; } - close(savedStatus, savedTrailers); + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.closed"); + try { + close(savedStatus, savedTrailers); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } } @@ -590,13 +652,15 @@ final class ClientCallImpl extends ClientCall { @Override public void onReady() { - class StreamOnReady extends ContextRunnable { + final class StreamOnReady extends ContextRunnable { StreamOnReady() { super(context); } @Override public final void runInContext() { + PerfMark.taskStart( + listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.onReady"); try { observer.onReady(); } catch (Throwable t) { @@ -604,6 +668,8 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 0fc52ec34f..4f418ec7c7 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -36,6 +36,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,6 +53,7 @@ final class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; + private final PerfTag tag; private final Context.CancellableContext context; private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; @@ -70,6 +73,8 @@ final class ServerCallImpl extends ServerCall { CallTracer serverCallTracer) { this.stream = stream; this.method = method; + // TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall. + this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName()); this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; @@ -85,6 +90,16 @@ final class ServerCallImpl extends ServerCall { @Override public void sendHeaders(Metadata headers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendHeaders"); + try { + sendHeadersInternal(headers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendHeadersInternal(Metadata headers) { checkState(!sendHeadersCalled, "sendHeaders has already been called"); checkState(!closeCalled, "call is closed"); @@ -125,6 +140,16 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendMessage"); + try { + sendMessageInternal(message); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void sendMessageInternal(RespT message) { checkState(sendHeadersCalled, "sendHeaders has not been called"); checkState(!closeCalled, "call is closed"); @@ -169,6 +194,16 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { + PerfMark.taskStart( + tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.close"); + try { + closeInternal(status, trailers); + } finally { + PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName()); + } + } + + private void closeInternal(Status status, Metadata trailers) { checkState(!closeCalled, "call already closed"); try { closeCalled = true; @@ -228,6 +263,7 @@ final class ServerCallImpl extends ServerCall { private final ServerCallImpl call; private final ServerCall.Listener listener; private final Context.CancellableContext context; + private final long listenerScopeId = PerfTag.allocateNumericId(); public ServerStreamListenerImpl( ServerCallImpl call, ServerCall.Listener listener, @@ -256,6 +292,11 @@ final class ServerCallImpl extends ServerCall { return; } + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.messagesAvailable"); InputStream message; try { while ((message = producer.next()) != null) { @@ -271,6 +312,8 @@ final class ServerCallImpl extends ServerCall { GrpcUtil.closeQuietly(producer); MoreThrowables.throwIfUnchecked(t); throw new RuntimeException(t); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } @@ -280,22 +323,42 @@ final class ServerCallImpl extends ServerCall { return; } - listener.onHalfClose(); + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.halfClosed"); + + try { + listener.onHalfClose(); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } @Override public void closed(Status status) { + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.closed"); try { - if (status.isOk()) { - listener.onComplete(); - } else { - call.cancelled = true; - listener.onCancel(); + try { + if (status.isOk()) { + listener.onComplete(); + } else { + call.cancelled = true; + listener.onCancel(); + } + } finally { + // Cancel context after delivering RPC closure notification to allow the application to + // clean up and update any state based on whether onComplete or onCancel was called. + context.cancel(null); + } } finally { - // Cancel context after delivering RPC closure notification to allow the application to - // clean up and update any state based on whether onComplete or onCancel was called. - context.cancel(null); + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); } } @@ -304,7 +367,16 @@ final class ServerCallImpl extends ServerCall { if (call.cancelled) { return; } - listener.onReady(); + PerfMark.taskStart( + listenerScopeId, + Thread.currentThread().getName(), + call.tag, + "ServerCall.closed"); + try { + listener.onReady(); + } finally { + PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName()); + } } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index d08abf7b52..4e4c3e1437 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -596,6 +596,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerStream stream, Metadata headers, Context.CancellableContext context) { + ServerCallImpl call = new ServerCallImpl<>( stream, methodDef.getMethodDescriptor(),