From 71967622d635160172e0739541f88197f9da70a2 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Thu, 6 Jun 2019 17:58:49 -0700 Subject: [PATCH] core, netty: add io.perfmark Annotations This add perfmark annotations in some key places, notably on transport/application boundaries, and thread hop locations. Perfmark records to a thread-local buffer the events that happen in each thread. Perfmark is disabled by default, and will compile to a noop unless Perfmark.setEnabled is invoked. This should make it free when disable, and pretty fast when it is enabled. It is important that started tasks are ended, so several places in our code are moved to either try-finally blocks, or moved into a private method. I realize this is ugly, but I think it is manageable. In the future, we can look at making an agent or compiler plugin that simplifies the recording. Linking between threads is done with a Link object, which is created on the "outbound" task, and used on the "inbound" task. This is slightly more verbose, and does has a small amount of runtime overhead, even when disabled. (for null checks, slightly higher memory usage, etc.) I think this is okay to, because it makes other optimizations much easier. --- build.gradle | 1 + core/BUILD.bazel | 24 +-- core/build.gradle | 1 + .../java/io/grpc/internal/ClientCallImpl.java | 139 ++++++++++---- .../java/io/grpc/internal/ServerCallImpl.java | 98 +++++----- .../java/io/grpc/internal/ServerImpl.java | 106 +++++++++-- .../io/grpc/perfmark/InternalPerfMark.java | 49 ----- .../main/java/io/grpc/perfmark/PerfMark.java | 171 ------------------ .../java/io/grpc/perfmark/PerfMarkTask.java | 31 ---- .../java/io/grpc/perfmark/PerfMarker.java | 71 -------- .../main/java/io/grpc/perfmark/PerfTag.java | 118 ------------ .../java/io/grpc/perfmark/package-info.java | 24 --- .../io/grpc/internal/ServerCallImplTest.java | 14 +- .../java/io/grpc/internal/ServerImplTest.java | 19 +- netty/BUILD.bazel | 1 + .../java/io/grpc/netty/NettyClientStream.java | 47 ++++- .../java/io/grpc/netty/NettyServerStream.java | 63 +++++-- .../io/grpc/netty/SendGrpcFrameCommand.java | 9 + .../main/java/io/grpc/netty/WriteQueue.java | 45 ++++- repositories.bzl | 12 ++ 20 files changed, 428 insertions(+), 615 deletions(-) delete mode 100644 core/src/main/java/io/grpc/perfmark/InternalPerfMark.java delete mode 100644 core/src/main/java/io/grpc/perfmark/PerfMark.java delete mode 100644 core/src/main/java/io/grpc/perfmark/PerfMarkTask.java delete mode 100644 core/src/main/java/io/grpc/perfmark/PerfMarker.java delete mode 100644 core/src/main/java/io/grpc/perfmark/PerfTag.java delete mode 100644 core/src/main/java/io/grpc/perfmark/package-info.java diff --git a/build.gradle b/build.gradle index a573d3fab8..24152f8760 100644 --- a/build.gradle +++ b/build.gradle @@ -201,6 +201,7 @@ subprojects { opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}", opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}", instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', + perfmark: 'io.perfmark:perfmark-api:0.16.0', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0", diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 3a22b50aac..4374e7b7d9 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -1,16 +1,3 @@ -PERFMARK_INTERNAL_ACCESSOR_SRCS = glob( - [ - "src/main/java/io/grpc/perfmark/Internal*.java", - ], -) - -PERFMARK_SRCS = glob( - [ - "src/main/java/io/grpc/perfmark/*.java", - ], - exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS, -) - java_library( name = "core", visibility = ["//visibility:public"], @@ -43,7 +30,6 @@ java_library( ]), visibility = ["//:__subpackages__"], deps = [ - ":perfmark", "//api", "//context", "@com_google_android_annotations//jar", @@ -54,6 +40,7 @@ java_library( "@com_google_j2objc_j2objc_annotations//jar", "@io_opencensus_opencensus_api//jar", "@io_opencensus_opencensus_contrib_grpc_metrics//jar", + "@io_perfmark_perfmark_api//jar", "@org_codehaus_mojo_animal_sniffer_annotations//jar", ], ) @@ -76,12 +63,3 @@ java_library( ], ) -java_library( - name = "perfmark", - srcs = PERFMARK_SRCS, - visibility = ["//:__subpackages__"], - deps = [ - "@com_google_code_findbugs_jsr305//jar", - "@com_google_errorprone_error_prone_annotations//jar", - ], -) diff --git a/core/build.gradle b/core/build.gradle index c822177932..e57eb34446 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -4,6 +4,7 @@ dependencies { compile project(':grpc-api'), libraries.gson, libraries.android_annotations, + libraries.perfmark compile (libraries.opencensus_api) { // prefer 3.0.2 from libraries instead of 3.0.1 exclude group: 'com.google.code.findbugs', module: 'jsr305' diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index ff9e930d81..976c7c24cf 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -46,8 +46,9 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; -import io.grpc.perfmark.PerfMark; -import io.grpc.perfmark.PerfTag; +import io.perfmark.Link; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.CancellationException; @@ -69,7 +70,7 @@ final class ClientCallImpl extends ClientCall { = "gzip".getBytes(Charset.forName("US-ASCII")); private final MethodDescriptor method; - private final PerfTag tag; + private final Tag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; @@ -96,7 +97,7 @@ final class ClientCallImpl extends ClientCall { boolean retryEnabled) { this.method = method; // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. - this.tag = PerfMark.createTag(method.getFullMethodName()); + this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. // See https://github.com/grpc/grpc-java/issues/368 @@ -112,6 +113,7 @@ final class ClientCallImpl extends ClientCall { this.clientTransportProvider = clientTransportProvider; this.deadlineCancellationExecutor = deadlineCancellationExecutor; this.retryEnabled = retryEnabled; + PerfMark.event("ClientCall.", tag); } private final class ContextCancellationListener implements CancellationListener { @@ -183,11 +185,11 @@ final class ClientCallImpl extends ClientCall { @Override public void start(Listener observer, Metadata headers) { - PerfMark.taskStart(tag, "ClientCall.start"); + PerfMark.startTask("ClientCall.start", tag); try { startInternal(observer, headers); } finally { - PerfMark.taskEnd(tag, "ClientCall.start"); + PerfMark.stopTask("ClientCall.start", tag); } } @@ -378,18 +380,23 @@ final class ClientCallImpl extends ClientCall { @Override public void request(int numMessages) { - checkState(stream != null, "Not started"); - checkArgument(numMessages >= 0, "Number requested must be non-negative"); - stream.request(numMessages); + PerfMark.startTask("ClientCall.request", tag); + try { + checkState(stream != null, "Not started"); + checkArgument(numMessages >= 0, "Number requested must be non-negative"); + stream.request(numMessages); + } finally { + PerfMark.stopTask("ClientCall.cancel", tag); + } } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - PerfMark.taskStart(tag, "ClientCall.cancel"); + PerfMark.startTask("ClientCall.cancel", tag); try { cancelInternal(message, cause); } finally { - PerfMark.taskEnd(tag, "ClientCall.cancel"); + PerfMark.stopTask("ClientCall.cancel", tag); } } @@ -424,11 +431,11 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { - PerfMark.taskStart(tag, "ClientCall.halfClose"); + PerfMark.startTask("ClientCall.halfClose", tag); try { halfCloseInternal(); } finally { - PerfMark.taskEnd(tag, "ClientCall.halfClose"); + PerfMark.stopTask("ClientCall.halfClose", tag); } } @@ -442,11 +449,11 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { - PerfMark.taskStart(tag, "ClientCall.sendMessage"); + PerfMark.startTask("ClientCall.sendMessage", tag); try { sendMessageInternal(message); } finally { - PerfMark.taskEnd(tag, "ClientCall.sendMessage"); + PerfMark.stopTask("ClientCall.sendMessage", tag); } } @@ -515,17 +522,29 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { + PerfMark.startTask("ClientStreamListener.headersRead", tag); + final Link link = PerfMark.link(); + final class HeadersRead extends ContextRunnable { HeadersRead() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.headersRead", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.headersRead", tag); + } + } + + private void runInternal() { if (closed) { return; } - PerfMark.taskStart(tag, "ClientCall.headersRead"); try { observer.onHeaders(headers); } catch (Throwable t) { @@ -533,29 +552,43 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.headersRead"); } } } - callExecutor.execute(new HeadersRead()); + try { + callExecutor.execute(new HeadersRead()); + } finally { + PerfMark.stopTask("ClientStreamListener.headersRead", tag); + } } @Override public void messagesAvailable(final MessageProducer producer) { + PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); + final Link link = PerfMark.link(); + final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag); + } + } + + private void runInternal() { if (closed) { GrpcUtil.closeQuietly(producer); return; } - PerfMark.taskStart(tag, "ClientCall.messagesAvailable"); try { InputStream message; while ((message = producer.next()) != null) { @@ -573,13 +606,15 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read message."); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.messagesAvailable"); } } } - callExecutor.execute(new MessagesAvailable()); + try { + callExecutor.execute(new MessagesAvailable()); + } finally { + PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); + } } /** @@ -603,6 +638,16 @@ final class ClientCallImpl extends ClientCall { @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + PerfMark.startTask("ClientStreamListener.closed", tag); + try { + closedInternal(status, rpcProgress, trailers); + } finally { + PerfMark.stopTask("ClientStreamListener.closed", tag); + } + } + + private void closedInternal( + Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) { Deadline deadline = effectiveDeadline(); if (status.getCode() == Status.Code.CANCELLED && deadline != null) { // When the server's deadline expires, it can only reset the stream with CANCEL and no @@ -616,23 +661,29 @@ final class ClientCallImpl extends ClientCall { } final Status savedStatus = status; final Metadata savedTrailers = trailers; + final Link link = PerfMark.link(); final class StreamClosed extends ContextRunnable { StreamClosed() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.onClose", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.onClose", tag); + } + } + + private void runInternal() { if (closed) { // We intentionally don't keep the status or metadata from the server. return; } - PerfMark.taskStart(tag, "ClientCall.closed"); - try { - close(savedStatus, savedTrailers); - } finally { - PerfMark.taskEnd(tag, "ClientCall.closed"); - } + close(savedStatus, savedTrailers); } } @@ -641,14 +692,26 @@ final class ClientCallImpl extends ClientCall { @Override public void onReady() { + PerfMark.startTask("ClientStreamListener.onReady", tag); + final Link link = PerfMark.link(); + final class StreamOnReady extends ContextRunnable { StreamOnReady() { super(context); } @Override - public final void runInContext() { - PerfMark.taskStart(tag, "ClientCall.onReady"); + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.onReady", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.onReady", tag); + } + } + + private void runInternal() { try { observer.onReady(); } catch (Throwable t) { @@ -656,13 +719,15 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.onReady"); } } } - callExecutor.execute(new StreamOnReady()); + try { + callExecutor.execute(new StreamOnReady()); + } finally { + PerfMark.stopTask("ClientStreamListener.onReady", tag); + } } } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index ec50c76414..6f123e7667 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -37,8 +37,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; -import io.grpc.perfmark.PerfMark; -import io.grpc.perfmark.PerfTag; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,7 +54,7 @@ final class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; - private final PerfTag tag; + private final Tag tag; private final Context.CancellableContext context; private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; @@ -71,31 +71,35 @@ final class ServerCallImpl extends ServerCall { ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, - CallTracer serverCallTracer) { + CallTracer serverCallTracer, Tag tag) { this.stream = stream; this.method = method; - // TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall. - this.tag = PerfMark.createTag(method.getFullMethodName()); this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; this.compressorRegistry = compressorRegistry; this.serverCallTracer = serverCallTracer; this.serverCallTracer.reportCallStarted(); + this.tag = tag; } @Override public void request(int numMessages) { - stream.request(numMessages); + PerfMark.startTask("ServerCall.request", tag); + try { + stream.request(numMessages); + } finally { + PerfMark.stopTask("ServerCall.request", tag); + } } @Override public void sendHeaders(Metadata headers) { - PerfMark.taskStart(tag, "ServerCall.sendHeaders"); + PerfMark.startTask("ServerCall.sendHeaders", tag); try { sendHeadersInternal(headers); } finally { - PerfMark.taskEnd(tag, "ServerCall.sendHeaders"); + PerfMark.stopTask("ServerCall.sendHeaders", tag); } } @@ -140,11 +144,11 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { - PerfMark.taskStart(tag, "ServerCall.sendMessage"); + PerfMark.startTask("ServerCall.sendMessage", tag); try { sendMessageInternal(message); } finally { - PerfMark.taskEnd(tag, "ServerCall.sendMessage"); + PerfMark.stopTask("ServerCall.sendMessage", tag); } } @@ -193,11 +197,11 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { - PerfMark.taskStart(tag, "ServerCall.close"); + PerfMark.startTask("ServerCall.close", tag); try { closeInternal(status, trailers); } finally { - PerfMark.taskEnd(tag, "ServerCall.close"); + PerfMark.stopTask("ServerCall.close", tag); } } @@ -281,15 +285,23 @@ final class ServerCallImpl extends ServerCall { MoreExecutors.directExecutor()); } - @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try @Override - public void messagesAvailable(final MessageProducer producer) { + public void messagesAvailable(MessageProducer producer) { + PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag); + try { + messagesAvailableInternal(producer); + } finally { + PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag); + } + } + + @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try + private void messagesAvailableInternal(final MessageProducer producer) { if (call.cancelled) { GrpcUtil.closeQuietly(producer); return; } - PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable"); InputStream message; try { while ((message = producer.next()) != null) { @@ -305,58 +317,58 @@ final class ServerCallImpl extends ServerCall { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); throw new RuntimeException(t); - } finally { - PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable"); } } @Override public void halfClosed() { - if (call.cancelled) { - return; - } - - PerfMark.taskStart(call.tag, "ServerCall.halfClosed"); - + PerfMark.startTask("ServerStreamListener.halfClosed", call.tag); try { + if (call.cancelled) { + return; + } + listener.onHalfClose(); } finally { - PerfMark.taskEnd(call.tag, "ServerCall.halfClosed"); + PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag); } } @Override public void closed(Status status) { - PerfMark.taskStart(call.tag, "ServerCall.closed"); + PerfMark.startTask("ServerStreamListener.closed", call.tag); try { - try { - if (status.isOk()) { - listener.onComplete(); - } else { - call.cancelled = true; - listener.onCancel(); - } - } finally { - // Cancel context after delivering RPC closure notification to allow the application to - // clean up and update any state based on whether onComplete or onCancel was called. - context.cancel(null); + closedInternal(status); + } finally { + PerfMark.stopTask("ServerStreamListener.closed", call.tag); + } + } + private void closedInternal(Status status) { + try { + if (status.isOk()) { + listener.onComplete(); + } else { + call.cancelled = true; + listener.onCancel(); } } finally { - PerfMark.taskEnd(call.tag, "ServerCall.closed"); + // Cancel context after delivering RPC closure notification to allow the application to + // clean up and update any state based on whether onComplete or onCancel was called. + context.cancel(null); } } @Override public void onReady() { - if (call.cancelled) { - return; - } - PerfMark.taskStart(call.tag, "ServerCall.closed"); + PerfMark.startTask("ServerStreamListener.onReady", call.tag); try { + if (call.cancelled) { + return; + } listener.onReady(); } finally { - PerfMark.taskEnd(call.tag, "ServerCall.closed"); + PerfMark.stopTask("ServerCall.closed", call.tag); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 4e4c3e1437..886fa0ff7e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -50,6 +50,9 @@ import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; +import io.perfmark.Link; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -460,9 +463,21 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume transportClosed(transport); } + @Override - public void streamCreated( - final ServerStream stream, final String methodName, final Metadata headers) { + public void streamCreated(ServerStream stream, String methodName, Metadata headers) { + Tag tag = PerfMark.createTag(methodName, stream.hashCode()); + PerfMark.startTask("ServerTransportListener.streamCreated", tag); + try { + streamCreatedInternal(stream, methodName, headers, tag); + } finally { + PerfMark.stopTask("ServerTransportListener.streamCreated", tag); + } + } + + private void streamCreatedInternal( + final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { + if (headers.containsKey(MESSAGE_ENCODING_KEY)) { String encoding = headers.get(MESSAGE_ENCODING_KEY); Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding); @@ -489,22 +504,33 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume wrappedExecutor = new SerializingExecutor(executor); } + final Link link = PerfMark.link(); + final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener( - wrappedExecutor, executor, stream, context); + wrappedExecutor, executor, stream, context, tag); stream.setListener(jumpListener); // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks // are delivered, including any errors. Callbacks can still be triggered, but they will be // queued. final class StreamCreated extends ContextRunnable { - StreamCreated() { super(context); } @Override public void runInContext() { + PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag); + } + } + + private void runInternal() { ServerStreamListener listener = NOOP_LISTENER; try { ServerMethodDefinition method = registry.lookupMethod(methodName); @@ -523,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context.cancel(null); return; } - listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); + listener = + startCall(stream, methodName, method, headers, context, statsTraceCtx, tag); } catch (RuntimeException e) { stream.close(Status.fromThrowable(e), new Metadata()); context.cancel(null); @@ -573,7 +600,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Never returns {@code null}. */ private ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition methodDef, Metadata headers, - Context.CancellableContext context, StatsTraceContext statsTraceCtx) { + Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? statsTraceCtx.serverCallStarted( new ServerCallInfoImpl<>( @@ -587,7 +614,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition interceptedDef = methodDef.withServerCallHandler(handler); ServerMethodDefinition wMethodDef = binlog == null ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); - return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context); + return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); } private ServerStreamListener startWrappedCall( @@ -595,7 +622,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition methodDef, ServerStream stream, Metadata headers, - Context.CancellableContext context) { + Context.CancellableContext context, + Tag tag) { ServerCallImpl call = new ServerCallImpl<>( stream, @@ -604,7 +632,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context, decompressorRegistry, compressorRegistry, - serverCallTracer); + serverCallTracer, + tag); ServerCall.Listener listener = methodDef.getServerCallHandler().startCall(call, headers); @@ -687,15 +716,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final Executor cancelExecutor; private final Context.CancellableContext context; private final ServerStream stream; + private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) { + Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; this.context = context; + this.tag = tag; } /** @@ -725,6 +756,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void messagesAvailable(final MessageProducer producer) { + PerfMark.startTask("ServerStreamListener.messagesAvailable", tag); + final Link link = PerfMark.link(); final class MessagesAvailable extends ContextRunnable { @@ -734,6 +767,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag); + link.link(); try { getListener().messagesAvailable(producer); } catch (RuntimeException e) { @@ -742,15 +777,24 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); } } } - callExecutor.execute(new MessagesAvailable()); + try { + callExecutor.execute(new MessagesAvailable()); + } finally { + PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag); + } } @Override public void halfClosed() { + PerfMark.startTask("ServerStreamListener.halfClosed", tag); + final Link link = PerfMark.link(); + final class HalfClosed extends ContextRunnable { HalfClosed() { super(context); @@ -758,6 +802,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).halfClosed", tag); + link.link(); try { getListener().halfClosed(); } catch (RuntimeException e) { @@ -766,15 +812,30 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).halfClosed", tag); } } } - callExecutor.execute(new HalfClosed()); + try { + callExecutor.execute(new HalfClosed()); + } finally { + PerfMark.stopTask("ServerStreamListener.halfClosed", tag); + } } @Override public void closed(final Status status) { + PerfMark.startTask("ServerStreamListener.closed", tag); + try { + closedInternal(status); + } finally { + PerfMark.stopTask("ServerStreamListener.closed", tag); + } + } + + private void closedInternal(final Status status) { // For cancellations, promptly inform any users of the context that their work should be // aborted. Otherwise, we can wait until pending work is done. if (!status.isOk()) { @@ -782,6 +843,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume // is not serializing. cancelExecutor.execute(new ContextCloser(context, status.getCause())); } + final Link link = PerfMark.link(); final class Closed extends ContextRunnable { Closed() { @@ -790,7 +852,13 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - getListener().closed(status); + PerfMark.startTask("ServerCallListener(app).closed", tag); + link.link(); + try { + getListener().closed(status); + } finally { + PerfMark.stopTask("ServerCallListener(app).closed", tag); + } } } @@ -799,6 +867,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void onReady() { + PerfMark.startTask("ServerStreamListener.onReady", tag); + final Link link = PerfMark.link(); final class OnReady extends ContextRunnable { OnReady() { super(context); @@ -806,6 +876,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).onReady", tag); + link.link(); try { getListener().onReady(); } catch (RuntimeException e) { @@ -814,11 +886,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).onReady", tag); } } } - callExecutor.execute(new OnReady()); + try { + callExecutor.execute(new OnReady()); + } finally { + PerfMark.stopTask("ServerStreamListener.onReady", tag); + } } } diff --git a/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java b/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java deleted file mode 100644 index 61a884380d..0000000000 --- a/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - - -/** - * Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use - * by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you - * *really* think you need to use this, contact the gRPC team first. - */ -public final class InternalPerfMark { - - private InternalPerfMark() {} - - /** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */ - public abstract static class InternalPerfMarkTask extends PerfMarkTask { - public InternalPerfMarkTask() {} - } - - /** Expose methods that create PerfTag to packages that utilize PerfMark. */ - private static final long NULL_NUMERIC_TAG = 0; - private static final String NULL_STRING_TAG = null; - - public static PerfTag createPerfTag(long numericTag, String stringTag) { - return PerfTag.TagFactory.create(numericTag, stringTag); - } - - public static PerfTag createPerfTag(String stringTag) { - return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag); - } - - public static PerfTag createPerfTag(long numericTag) { - return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG); - } -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMark.java b/core/src/main/java/io/grpc/perfmark/PerfMark.java deleted file mode 100644 index f30363e7d4..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMark.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import com.google.errorprone.annotations.CompileTimeConstant; -import io.grpc.perfmark.PerfTag.TagFactory; - -/** - * PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This - * class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet. - */ -public final class PerfMark { - private PerfMark() { - throw new AssertionError("nope"); - } - - /** - * Start a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #taskStart(String)} or {@link PerfTag#create(String)}. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {} - - /** - * Start a Task; a task represents some work that spans some time, and you are interested in both - * its start time and end time. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static void taskStart(@CompileTimeConstant String taskName) {} - - /** - * End a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param tag a Tag object associated with the task start. This should be the tag used for the - * corresponding {@link #taskStart(PerfTag, String)} call. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. This should - * be the name used by the corresponding {@link #taskStart(PerfTag, String)} call. - */ - public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {} - - /** - * End a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. This should - * be the name used by the corresponding {@link #taskStart(String)} call. - */ - public static void taskEnd(@CompileTimeConstant String taskName) {} - - /** - * Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some - * work that spans some time, and you are interested in both its start time and end time. - * - *

Use this in a try-with-resource statement so that task will end automatically. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #task(String)} or {@link PerfTag#create(String)}. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) { - return NoopTask.INSTANCE; - } - - /** - * Start a Task it in a try-with-resource statement; a task represents some work that spans some - * time, and you are interested in both its start time and end time. - * - *

Use this in a try-with-resource statement so that task will end automatically. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static PerfMarkTask task(@CompileTimeConstant String taskName) { - return NoopTask.INSTANCE; - } - - /** - * Records an Event with a Tag to identify it. - * - *

An Event is different from a Task in that you don't care how much time it spanned. You are - * interested in only the time it happened. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #event(String)} or {@link PerfTag#create(String)}. - * @param eventName The name of the event. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this event. - */ - public static void event(PerfTag tag, @CompileTimeConstant String eventName) {} - - /** - * Records an Event. - * - *

An Event is different from a Task in that you don't care how much time it spanned. You are - * interested in only the time it happened. - * - * @param eventName The name of the event. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this event. - */ - public static void event(@CompileTimeConstant String eventName) {} - - /** - * If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(long, String)} if PerfMark agent is enabled. - * - */ - public static PerfTag createTag( - @SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - /** - * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(String)} if PerfMark agent is enabled. - */ - public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - /** - * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(long)} if PerfMark agent is enabled. - */ - public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - private static final PerfTag NULL_PERF_TAG = TagFactory.create(); - - private static final class NoopTask extends PerfMarkTask { - - private static final PerfMarkTask INSTANCE = new NoopTask(); - - NoopTask() {} - - @Override - public void close() {} - } -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java b/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java deleted file mode 100644 index 4a689d8ab1..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import java.io.Closeable; - -/** - * This class exists to make it easier for users to use the try-with-resource shorthand for - * starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and - * {@link io.grpc.ExperimentalApi}. Do not use this yet. - */ -public abstract class PerfMarkTask implements Closeable { - @Override - public abstract void close(); - - PerfMarkTask() {} -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarker.java b/core/src/main/java/io/grpc/perfmark/PerfMarker.java deleted file mode 100644 index b39b85f526..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMarker.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotation to add PerfMark instrumentation points surrounding method invocation. - * - *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this - * yet. - */ -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -// TODO(carl-mastrangelo): Add this line back in and make it okay on Android -//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE}) -public @interface PerfMarker { - - /** - * The name of the task; e.g. `parseHeaders`. - */ - String taskName(); - - /** - * An optional computed tag. - * - *

There are 3 supported references that can be used - *

    - *
  • {@code "this"}: Then the tag will be the {@link Object#toString} of the current class. - * Only valid for instance methods. - *
  • {@code "someFieldName"}: Then the tag will be the result of - * calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or - * and array type. (Though we may revisit this in the future). - *
  • {@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)} - * on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the - * second parameter. The referenced parameter cannot be a primitive or an array type. - * (Though we may revisit this in the future). - *
- * - *

In general you should reference either {@code "this"} or {@code final} fields since - * in these cases we can cache the operations to decrease the cost of computing the tags. A side - * effect of this is that for such references we will not have their tags recalculated after the - * first time. Thus it is best to use immutable objects for tags. - */ - String computedTag() default ""; - - /** - * True if class with annotation is immutable and instrumentation must adhere to this restriction. - * If enableSampling is passed as argument to the agent, instrumentation points with - * immutable = true are ignored. - */ - boolean immutable() default false; -} - diff --git a/core/src/main/java/io/grpc/perfmark/PerfTag.java b/core/src/main/java/io/grpc/perfmark/PerfTag.java deleted file mode 100644 index 3dfe6e3386..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfTag.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; - -/** - * A Tag is used to provide additional information to identify a task and consists of a 64-bit - * integer value and a string. - * - *

Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag} - * value can be used to identify the specific task being worked on (e.g. the id of the rpc call). - * The {@code stringTag} can be used to store any value that is not a compile-time constant (a - * restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the - * {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend - * to specify null. In that case you are encouraged to use {@link #create(String)}. - * - *

Invocations to {@code create} methods in this class are a no-op unless PerfMark - * instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for - * calls to {@link TagFactory} create methods. - * - *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this - * yet. - */ -@Immutable -public final class PerfTag { - - private static final long NULL_NUMERIC_TAG = 0; - private static final String NULL_STRING_TAG = null; - - private final long numericTag; - private final String stringTag; - - private PerfTag(long numericTag, @Nullable String stringTag) { - this.numericTag = numericTag; - this.stringTag = stringTag; - } - - /** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */ - public long getNumericTag() { - return numericTag; - } - - /** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */ - @Nullable public String getStringTag() { - return stringTag; - } - - @Override - public String toString() { - return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')"; - } - - @Override - public int hashCode() { - int longHashCode = (int)(numericTag ^ (numericTag >>> 32)); - return longHashCode + (stringTag != null ? stringTag.hashCode() : 31); - } - - @Override - @SuppressWarnings("ReferenceEquality") // No Java 8 yet. - public boolean equals(Object obj) { - if (!(obj instanceof PerfTag)) { - return false; - } - PerfTag that = (PerfTag) obj; - return numericTag == that.numericTag - && (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag))); - } - - /** - * Provides methods that create Tag instances which should not be directly invoked by clients. - * - *

Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link - * PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode - * rewriting, if enabled. - */ - static final class TagFactory { - /** - * This class should not be instantiated. - */ - private TagFactory() { - throw new AssertionError("nope"); - } - - public static PerfTag create(long numericTag, String stringTag) { - return new PerfTag(numericTag, stringTag); - } - - public static PerfTag create(String stringTag) { - return new PerfTag(NULL_NUMERIC_TAG, stringTag); - } - - public static PerfTag create(long numericTag) { - return new PerfTag(numericTag, NULL_STRING_TAG); - } - - static PerfTag create() { - return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG); - } - } -} - diff --git a/core/src/main/java/io/grpc/perfmark/package-info.java b/core/src/main/java/io/grpc/perfmark/package-info.java deleted file mode 100644 index 1f0b4dab70..0000000000 --- a/core/src/main/java/io/grpc/perfmark/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This is an internal, experimental API and not subject to the normal compatibility guarantees. - * - * @see io.grpc.Internal - */ -@javax.annotation.CheckReturnValue -@javax.annotation.ParametersAreNonnullByDefault -package io.grpc.perfmark; diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index ea34d4ce35..a4c0da2d69 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -43,6 +43,7 @@ import io.grpc.ServerCall; import io.grpc.Status; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.grpc.internal.testing.SingleMessageProducer; +import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.InputStreamReader; @@ -90,7 +91,7 @@ public class ServerCallImplTest { context = Context.ROOT.withCancellation(); call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, PerfMark.createTag()); } @Test @@ -113,7 +114,7 @@ public class ServerCallImplTest { call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - tracer); + tracer, PerfMark.createTag()); // required boilerplate call.sendHeaders(new Metadata()); @@ -224,7 +225,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); @@ -258,7 +260,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); serverCall.sendMessage(1L); @@ -295,7 +298,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(stream, times(1)).cancel(statusCaptor.capture()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 23042890a2..ea6c43a264 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,6 +77,7 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.util.MutableHandlerRegistry; +import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -1140,7 +1141,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1165,7 +1167,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1190,7 +1193,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1213,7 +1217,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1236,7 +1241,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1259,7 +1265,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); diff --git a/netty/BUILD.bazel b/netty/BUILD.bazel index 30f02bd985..9680660067 100644 --- a/netty/BUILD.bazel +++ b/netty/BUILD.bazel @@ -25,5 +25,6 @@ java_library( "@io_netty_netty_handler_proxy//jar", "@io_netty_netty_resolver//jar", "@io_netty_netty_transport//jar", + "@io_perfmark_perfmark_api//jar", ], ) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index ba49010c6b..0949faa981 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -43,6 +43,7 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; +import io.perfmark.PerfMark; import javax.annotation.Nullable; /** @@ -114,8 +115,18 @@ class NettyClientStream extends AbstractClientStream { } private class Sink implements AbstractClientStream.Sink { + @Override public void writeHeaders(Metadata headers, byte[] requestPayload) { + PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); + try { + writeHeadersInternal(headers, requestPayload); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.writeHeaders"); + } + } + + private void writeHeadersInternal(Metadata headers, byte[] requestPayload) { // Convert the headers into Netty HTTP/2 headers. AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method); if (defaultPath == null) { @@ -152,15 +163,13 @@ class NettyClientStream extends AbstractClientStream { } } }; - // Write the command requesting the creation of the stream. writeQueue.enqueue( new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), !method.getType().clientSendsOneMessage() || get).addListener(failureListener); } - @Override - public void writeFrame( + private void writeFrameInternal( WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); @@ -184,12 +193,23 @@ class NettyClientStream extends AbstractClientStream { }); } else { // The frame is empty and will not impact outbound flow control. Just send it. - writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); + writeQueue.enqueue( + new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); } } @Override - public void request(final int numMessages) { + public void writeFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { + PerfMark.startTask("NettyClientStream$Sink.writeFrame"); + try { + writeFrameInternal(frame, endOfStream, flush, numMessages); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.writeFrame"); + } + } + + private void requestInternal(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -203,9 +223,24 @@ class NettyClientStream extends AbstractClientStream { } } + @Override + public void request(int numMessages) { + PerfMark.startTask("NettyClientStream$Sink.request"); + try { + requestInternal(numMessages); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.request"); + } + } + @Override public void cancel(Status status) { - writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); + PerfMark.startTask("NettyClientStream$Sink.cancel"); + try { + writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.cancel"); + } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index e9d221aa28..64ab6a3bda 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -33,6 +33,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.perfmark.PerfMark; import java.util.logging.Level; import java.util.logging.Logger; @@ -91,8 +92,8 @@ class NettyServerStream extends AbstractServerStream { } private class Sink implements AbstractServerStream.Sink { - @Override - public void request(final int numMessages) { + + private void requestInternal(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -107,16 +108,30 @@ class NettyServerStream extends AbstractServerStream { } @Override - public void writeHeaders(Metadata headers) { - writeQueue.enqueue( - SendResponseHeadersCommand.createHeaders( - transportState(), - Utils.convertServerHeaders(headers)), - true); + public void request(final int numMessages) { + PerfMark.startTask("NettyServerStream$Sink.request"); + try { + requestInternal(numMessages); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.request"); + } } @Override - public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { + public void writeHeaders(Metadata headers) { + PerfMark.startTask("NettyServerStream$Sink.writeHeaders"); + try { + writeQueue.enqueue( + SendResponseHeadersCommand.createHeaders( + transportState(), + Utils.convertServerHeaders(headers)), + true); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeHeaders"); + } + } + + private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); if (frame == null) { writeQueue.scheduleFlush(); @@ -140,17 +155,37 @@ class NettyServerStream extends AbstractServerStream { }); } + @Override + public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { + PerfMark.startTask("NettyServerStream$Sink.writeFrame"); + try { + writeFrameInternal(frame, flush, numMessages); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeFrame"); + } + } + @Override public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { - Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); - writeQueue.enqueue( - SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), - true); + PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); + try { + Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); + writeQueue.enqueue( + SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), + true); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); + } } @Override public void cancel(Status status) { - writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); + PerfMark.startTask("NettyServerStream$Sink.cancel"); + try { + writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); + } finally { + PerfMark.startTask("NettyServerStream$Sink.cancel"); + } } } diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 71e6bcfe29..4a343e8d6e 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -21,6 +21,8 @@ import io.netty.buffer.ByteBufHolder; import io.netty.buffer.DefaultByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; +import io.perfmark.Link; +import io.perfmark.PerfMark; /** * Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint. @@ -28,6 +30,7 @@ import io.netty.channel.ChannelPromise; final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand { private final StreamIdHolder stream; private final boolean endStream; + private final Link link; private ChannelPromise promise; @@ -35,6 +38,12 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu super(content); this.stream = stream; this.endStream = endStream; + this.link = PerfMark.link(); + } + + @Override + public Link getLink() { + return link; } int streamId() { diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index 24063ad181..f3ef7ce0e1 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -22,6 +22,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.perfmark.Link; +import io.perfmark.PerfMark; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,26 +106,44 @@ class WriteQueue { * called in the event loop */ private void flush() { + PerfMark.startTask("WriteQueue.periodicFlush"); try { QueuedCommand cmd; int i = 0; boolean flushedOnce = false; while ((cmd = queue.poll()) != null) { - cmd.run(channel); + PerfMark.startTask("WriteQueue.run"); + try { + cmd.getLink().link(); + cmd.run(channel); + } finally { + PerfMark.stopTask("WriteQueue.run"); + } if (++i == DEQUE_CHUNK_SIZE) { i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop // might never end as new events are continuously added to the queue, if we never // flushed in that case we would be guaranteed to OOM. - channel.flush(); + PerfMark.startTask("WriteQueue.flush0"); + try { + channel.flush(); + } finally { + PerfMark.stopTask("WriteQueue.flush0"); + } flushedOnce = true; } } // Must flush at least once, even if there were no writes. if (i != 0 || !flushedOnce) { - channel.flush(); + PerfMark.startTask("WriteQueue.flush1"); + try { + channel.flush(); + } finally { + PerfMark.stopTask("WriteQueue.flush1"); + } } } finally { + PerfMark.stopTask("WriteQueue.periodicFlush"); // Mark the write as done, if the queue is non-empty after marking trigger a new write. scheduled.set(false); if (!queue.isEmpty()) { @@ -134,8 +154,10 @@ class WriteQueue { private static class RunnableCommand implements QueuedCommand { private final Runnable runnable; + private final Link link; public RunnableCommand(Runnable runnable) { + this.link = PerfMark.link(); this.runnable = runnable; } @@ -153,11 +175,21 @@ class WriteQueue { public final void run(Channel channel) { runnable.run(); } + + @Override + public Link getLink() { + return link; + } } abstract static class AbstractQueuedCommand implements QueuedCommand { private ChannelPromise promise; + private final Link link; + + AbstractQueuedCommand() { + this.link = PerfMark.link(); + } @Override public final void promise(ChannelPromise promise) { @@ -173,6 +205,11 @@ class WriteQueue { public final void run(Channel channel) { channel.write(this, promise); } + + @Override + public Link getLink() { + return link; + } } /** @@ -190,5 +227,7 @@ class WriteQueue { void promise(ChannelPromise promise); void run(Channel channel); + + Link getLink(); } } diff --git a/repositories.bzl b/repositories.bzl index 4fd98cec56..c77df33038 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -35,6 +35,7 @@ def grpc_java_repositories( omit_io_netty_tcnative_boringssl_static = False, omit_io_opencensus_api = False, omit_io_opencensus_grpc_metrics = False, + omit_io_perfmark = False, omit_javax_annotation = False, omit_junit_junit = False, omit_net_zlib = False, @@ -103,6 +104,8 @@ def grpc_java_repositories( io_opencensus_api() if not omit_io_opencensus_grpc_metrics: io_opencensus_grpc_metrics() + if not omit_io_perfmark: + io_perfmark() if not omit_javax_annotation: javax_annotation() if not omit_junit_junit: @@ -402,6 +405,15 @@ def io_opencensus_grpc_metrics(): licenses = ["notice"], # Apache 2.0 ) +def io_perfmark(): + jvm_maven_import_external( + name = "io_perfmark_perfmark_api", + artifact = "io.perfmark:perfmark-api:0.16.0", + server_urls = ["http://central.maven.org/maven2"], + artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0", + licenses = ["notice"], # Apache 2.0 + ) + def javax_annotation(): # Use //stub:javax_annotation for neverlink=1 support. jvm_maven_import_external(