diff --git a/build.gradle b/build.gradle index b7b68aa611..188c5768cc 100644 --- a/build.gradle +++ b/build.gradle @@ -201,6 +201,7 @@ subprojects { opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}", opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}", instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', + perfmark: 'io.perfmark:perfmark-api:0.16.0', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0", diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 3a22b50aac..4374e7b7d9 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -1,16 +1,3 @@ -PERFMARK_INTERNAL_ACCESSOR_SRCS = glob( - [ - "src/main/java/io/grpc/perfmark/Internal*.java", - ], -) - -PERFMARK_SRCS = glob( - [ - "src/main/java/io/grpc/perfmark/*.java", - ], - exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS, -) - java_library( name = "core", visibility = ["//visibility:public"], @@ -43,7 +30,6 @@ java_library( ]), visibility = ["//:__subpackages__"], deps = [ - ":perfmark", "//api", "//context", "@com_google_android_annotations//jar", @@ -54,6 +40,7 @@ java_library( "@com_google_j2objc_j2objc_annotations//jar", "@io_opencensus_opencensus_api//jar", "@io_opencensus_opencensus_contrib_grpc_metrics//jar", + "@io_perfmark_perfmark_api//jar", "@org_codehaus_mojo_animal_sniffer_annotations//jar", ], ) @@ -76,12 +63,3 @@ java_library( ], ) -java_library( - name = "perfmark", - srcs = PERFMARK_SRCS, - visibility = ["//:__subpackages__"], - deps = [ - "@com_google_code_findbugs_jsr305//jar", - "@com_google_errorprone_error_prone_annotations//jar", - ], -) diff --git a/core/build.gradle b/core/build.gradle index f57c084f13..b02e42a01e 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -4,6 +4,7 @@ dependencies { compile project(':grpc-api'), libraries.gson, libraries.android_annotations, + libraries.perfmark compile (libraries.opencensus_api) { // prefer 3.0.2 from libraries instead of 3.0.1 exclude group: 'com.google.code.findbugs', module: 'jsr305' diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 7e4d8a8246..9ff010bbe9 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -46,8 +46,9 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; -import io.grpc.perfmark.PerfMark; -import io.grpc.perfmark.PerfTag; +import io.perfmark.Link; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.CancellationException; @@ -69,7 +70,7 @@ final class ClientCallImpl extends ClientCall { = "gzip".getBytes(Charset.forName("US-ASCII")); private final MethodDescriptor method; - private final PerfTag tag; + private final Tag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; @@ -96,7 +97,7 @@ final class ClientCallImpl extends ClientCall { boolean retryEnabled) { this.method = method; // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. - this.tag = PerfMark.createTag(method.getFullMethodName()); + this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. // See https://github.com/grpc/grpc-java/issues/368 @@ -112,6 +113,7 @@ final class ClientCallImpl extends ClientCall { this.clientTransportProvider = clientTransportProvider; this.deadlineCancellationExecutor = deadlineCancellationExecutor; this.retryEnabled = retryEnabled; + PerfMark.event("ClientCall.", tag); } private final class ContextCancellationListener implements CancellationListener { @@ -183,11 +185,11 @@ final class ClientCallImpl extends ClientCall { @Override public void start(Listener observer, Metadata headers) { - PerfMark.taskStart(tag, "ClientCall.start"); + PerfMark.startTask("ClientCall.start", tag); try { startInternal(observer, headers); } finally { - PerfMark.taskEnd(tag, "ClientCall.start"); + PerfMark.stopTask("ClientCall.start", tag); } } @@ -378,18 +380,23 @@ final class ClientCallImpl extends ClientCall { @Override public void request(int numMessages) { - checkState(stream != null, "Not started"); - checkArgument(numMessages >= 0, "Number requested must be non-negative"); - stream.request(numMessages); + PerfMark.startTask("ClientCall.request", tag); + try { + checkState(stream != null, "Not started"); + checkArgument(numMessages >= 0, "Number requested must be non-negative"); + stream.request(numMessages); + } finally { + PerfMark.stopTask("ClientCall.cancel", tag); + } } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - PerfMark.taskStart(tag, "ClientCall.cancel"); + PerfMark.startTask("ClientCall.cancel", tag); try { cancelInternal(message, cause); } finally { - PerfMark.taskEnd(tag, "ClientCall.cancel"); + PerfMark.stopTask("ClientCall.cancel", tag); } } @@ -424,11 +431,11 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { - PerfMark.taskStart(tag, "ClientCall.halfClose"); + PerfMark.startTask("ClientCall.halfClose", tag); try { halfCloseInternal(); } finally { - PerfMark.taskEnd(tag, "ClientCall.halfClose"); + PerfMark.stopTask("ClientCall.halfClose", tag); } } @@ -442,11 +449,11 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { - PerfMark.taskStart(tag, "ClientCall.sendMessage"); + PerfMark.startTask("ClientCall.sendMessage", tag); try { sendMessageInternal(message); } finally { - PerfMark.taskEnd(tag, "ClientCall.sendMessage"); + PerfMark.stopTask("ClientCall.sendMessage", tag); } } @@ -515,17 +522,29 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { + PerfMark.startTask("ClientStreamListener.headersRead", tag); + final Link link = PerfMark.link(); + final class HeadersRead extends ContextRunnable { HeadersRead() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.headersRead", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.headersRead", tag); + } + } + + private void runInternal() { if (closed) { return; } - PerfMark.taskStart(tag, "ClientCall.headersRead"); try { observer.onHeaders(headers); } catch (Throwable t) { @@ -533,29 +552,43 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.headersRead"); } } } - callExecutor.execute(new HeadersRead()); + try { + callExecutor.execute(new HeadersRead()); + } finally { + PerfMark.stopTask("ClientStreamListener.headersRead", tag); + } } @Override public void messagesAvailable(final MessageProducer producer) { + PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); + final Link link = PerfMark.link(); + final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag); + } + } + + private void runInternal() { if (closed) { GrpcUtil.closeQuietly(producer); return; } - PerfMark.taskStart(tag, "ClientCall.messagesAvailable"); try { InputStream message; while ((message = producer.next()) != null) { @@ -573,13 +606,15 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read message."); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.messagesAvailable"); } } } - callExecutor.execute(new MessagesAvailable()); + try { + callExecutor.execute(new MessagesAvailable()); + } finally { + PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); + } } /** @@ -603,6 +638,16 @@ final class ClientCallImpl extends ClientCall { @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { + PerfMark.startTask("ClientStreamListener.closed", tag); + try { + closedInternal(status, rpcProgress, trailers); + } finally { + PerfMark.stopTask("ClientStreamListener.closed", tag); + } + } + + private void closedInternal( + Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) { Deadline deadline = effectiveDeadline(); if (status.getCode() == Status.Code.CANCELLED && deadline != null) { // When the server's deadline expires, it can only reset the stream with CANCEL and no @@ -616,23 +661,29 @@ final class ClientCallImpl extends ClientCall { } final Status savedStatus = status; final Metadata savedTrailers = trailers; + final Link link = PerfMark.link(); final class StreamClosed extends ContextRunnable { StreamClosed() { super(context); } @Override - public final void runInContext() { + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.onClose", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.onClose", tag); + } + } + + private void runInternal() { if (closed) { // We intentionally don't keep the status or metadata from the server. return; } - PerfMark.taskStart(tag, "ClientCall.closed"); - try { - close(savedStatus, savedTrailers); - } finally { - PerfMark.taskEnd(tag, "ClientCall.closed"); - } + close(savedStatus, savedTrailers); } } @@ -641,14 +692,26 @@ final class ClientCallImpl extends ClientCall { @Override public void onReady() { + PerfMark.startTask("ClientStreamListener.onReady", tag); + final Link link = PerfMark.link(); + final class StreamOnReady extends ContextRunnable { StreamOnReady() { super(context); } @Override - public final void runInContext() { - PerfMark.taskStart(tag, "ClientCall.onReady"); + public void runInContext() { + PerfMark.startTask("ClientCall$Listener.onReady", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ClientCall$Listener.onReady", tag); + } + } + + private void runInternal() { try { observer.onReady(); } catch (Throwable t) { @@ -656,13 +719,15 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); stream.cancel(status); close(status, new Metadata()); - } finally { - PerfMark.taskEnd(tag, "ClientCall.onReady"); } } } - callExecutor.execute(new StreamOnReady()); + try { + callExecutor.execute(new StreamOnReady()); + } finally { + PerfMark.stopTask("ClientStreamListener.onReady", tag); + } } } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index ec50c76414..6f123e7667 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -37,8 +37,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; -import io.grpc.perfmark.PerfMark; -import io.grpc.perfmark.PerfTag; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,7 +54,7 @@ final class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; - private final PerfTag tag; + private final Tag tag; private final Context.CancellableContext context; private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; @@ -71,31 +71,35 @@ final class ServerCallImpl extends ServerCall { ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, - CallTracer serverCallTracer) { + CallTracer serverCallTracer, Tag tag) { this.stream = stream; this.method = method; - // TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall. - this.tag = PerfMark.createTag(method.getFullMethodName()); this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; this.compressorRegistry = compressorRegistry; this.serverCallTracer = serverCallTracer; this.serverCallTracer.reportCallStarted(); + this.tag = tag; } @Override public void request(int numMessages) { - stream.request(numMessages); + PerfMark.startTask("ServerCall.request", tag); + try { + stream.request(numMessages); + } finally { + PerfMark.stopTask("ServerCall.request", tag); + } } @Override public void sendHeaders(Metadata headers) { - PerfMark.taskStart(tag, "ServerCall.sendHeaders"); + PerfMark.startTask("ServerCall.sendHeaders", tag); try { sendHeadersInternal(headers); } finally { - PerfMark.taskEnd(tag, "ServerCall.sendHeaders"); + PerfMark.stopTask("ServerCall.sendHeaders", tag); } } @@ -140,11 +144,11 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { - PerfMark.taskStart(tag, "ServerCall.sendMessage"); + PerfMark.startTask("ServerCall.sendMessage", tag); try { sendMessageInternal(message); } finally { - PerfMark.taskEnd(tag, "ServerCall.sendMessage"); + PerfMark.stopTask("ServerCall.sendMessage", tag); } } @@ -193,11 +197,11 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { - PerfMark.taskStart(tag, "ServerCall.close"); + PerfMark.startTask("ServerCall.close", tag); try { closeInternal(status, trailers); } finally { - PerfMark.taskEnd(tag, "ServerCall.close"); + PerfMark.stopTask("ServerCall.close", tag); } } @@ -281,15 +285,23 @@ final class ServerCallImpl extends ServerCall { MoreExecutors.directExecutor()); } - @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try @Override - public void messagesAvailable(final MessageProducer producer) { + public void messagesAvailable(MessageProducer producer) { + PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag); + try { + messagesAvailableInternal(producer); + } finally { + PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag); + } + } + + @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try + private void messagesAvailableInternal(final MessageProducer producer) { if (call.cancelled) { GrpcUtil.closeQuietly(producer); return; } - PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable"); InputStream message; try { while ((message = producer.next()) != null) { @@ -305,58 +317,58 @@ final class ServerCallImpl extends ServerCall { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); throw new RuntimeException(t); - } finally { - PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable"); } } @Override public void halfClosed() { - if (call.cancelled) { - return; - } - - PerfMark.taskStart(call.tag, "ServerCall.halfClosed"); - + PerfMark.startTask("ServerStreamListener.halfClosed", call.tag); try { + if (call.cancelled) { + return; + } + listener.onHalfClose(); } finally { - PerfMark.taskEnd(call.tag, "ServerCall.halfClosed"); + PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag); } } @Override public void closed(Status status) { - PerfMark.taskStart(call.tag, "ServerCall.closed"); + PerfMark.startTask("ServerStreamListener.closed", call.tag); try { - try { - if (status.isOk()) { - listener.onComplete(); - } else { - call.cancelled = true; - listener.onCancel(); - } - } finally { - // Cancel context after delivering RPC closure notification to allow the application to - // clean up and update any state based on whether onComplete or onCancel was called. - context.cancel(null); + closedInternal(status); + } finally { + PerfMark.stopTask("ServerStreamListener.closed", call.tag); + } + } + private void closedInternal(Status status) { + try { + if (status.isOk()) { + listener.onComplete(); + } else { + call.cancelled = true; + listener.onCancel(); } } finally { - PerfMark.taskEnd(call.tag, "ServerCall.closed"); + // Cancel context after delivering RPC closure notification to allow the application to + // clean up and update any state based on whether onComplete or onCancel was called. + context.cancel(null); } } @Override public void onReady() { - if (call.cancelled) { - return; - } - PerfMark.taskStart(call.tag, "ServerCall.closed"); + PerfMark.startTask("ServerStreamListener.onReady", call.tag); try { + if (call.cancelled) { + return; + } listener.onReady(); } finally { - PerfMark.taskEnd(call.tag, "ServerCall.closed"); + PerfMark.stopTask("ServerCall.closed", call.tag); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 4e4c3e1437..886fa0ff7e 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -50,6 +50,9 @@ import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; +import io.perfmark.Link; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -460,9 +463,21 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume transportClosed(transport); } + @Override - public void streamCreated( - final ServerStream stream, final String methodName, final Metadata headers) { + public void streamCreated(ServerStream stream, String methodName, Metadata headers) { + Tag tag = PerfMark.createTag(methodName, stream.hashCode()); + PerfMark.startTask("ServerTransportListener.streamCreated", tag); + try { + streamCreatedInternal(stream, methodName, headers, tag); + } finally { + PerfMark.stopTask("ServerTransportListener.streamCreated", tag); + } + } + + private void streamCreatedInternal( + final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { + if (headers.containsKey(MESSAGE_ENCODING_KEY)) { String encoding = headers.get(MESSAGE_ENCODING_KEY); Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding); @@ -489,22 +504,33 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume wrappedExecutor = new SerializingExecutor(executor); } + final Link link = PerfMark.link(); + final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener( - wrappedExecutor, executor, stream, context); + wrappedExecutor, executor, stream, context, tag); stream.setListener(jumpListener); // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks // are delivered, including any errors. Callbacks can still be triggered, but they will be // queued. final class StreamCreated extends ContextRunnable { - StreamCreated() { super(context); } @Override public void runInContext() { + PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag); + link.link(); + try { + runInternal(); + } finally { + PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag); + } + } + + private void runInternal() { ServerStreamListener listener = NOOP_LISTENER; try { ServerMethodDefinition method = registry.lookupMethod(methodName); @@ -523,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context.cancel(null); return; } - listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); + listener = + startCall(stream, methodName, method, headers, context, statsTraceCtx, tag); } catch (RuntimeException e) { stream.close(Status.fromThrowable(e), new Metadata()); context.cancel(null); @@ -573,7 +600,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Never returns {@code null}. */ private ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition methodDef, Metadata headers, - Context.CancellableContext context, StatsTraceContext statsTraceCtx) { + Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? statsTraceCtx.serverCallStarted( new ServerCallInfoImpl<>( @@ -587,7 +614,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition interceptedDef = methodDef.withServerCallHandler(handler); ServerMethodDefinition wMethodDef = binlog == null ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); - return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context); + return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); } private ServerStreamListener startWrappedCall( @@ -595,7 +622,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition methodDef, ServerStream stream, Metadata headers, - Context.CancellableContext context) { + Context.CancellableContext context, + Tag tag) { ServerCallImpl call = new ServerCallImpl<>( stream, @@ -604,7 +632,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context, decompressorRegistry, compressorRegistry, - serverCallTracer); + serverCallTracer, + tag); ServerCall.Listener listener = methodDef.getServerCallHandler().startCall(call, headers); @@ -687,15 +716,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final Executor cancelExecutor; private final Context.CancellableContext context; private final ServerStream stream; + private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) { + Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; this.context = context; + this.tag = tag; } /** @@ -725,6 +756,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void messagesAvailable(final MessageProducer producer) { + PerfMark.startTask("ServerStreamListener.messagesAvailable", tag); + final Link link = PerfMark.link(); final class MessagesAvailable extends ContextRunnable { @@ -734,6 +767,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag); + link.link(); try { getListener().messagesAvailable(producer); } catch (RuntimeException e) { @@ -742,15 +777,24 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); } } } - callExecutor.execute(new MessagesAvailable()); + try { + callExecutor.execute(new MessagesAvailable()); + } finally { + PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag); + } } @Override public void halfClosed() { + PerfMark.startTask("ServerStreamListener.halfClosed", tag); + final Link link = PerfMark.link(); + final class HalfClosed extends ContextRunnable { HalfClosed() { super(context); @@ -758,6 +802,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).halfClosed", tag); + link.link(); try { getListener().halfClosed(); } catch (RuntimeException e) { @@ -766,15 +812,30 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).halfClosed", tag); } } } - callExecutor.execute(new HalfClosed()); + try { + callExecutor.execute(new HalfClosed()); + } finally { + PerfMark.stopTask("ServerStreamListener.halfClosed", tag); + } } @Override public void closed(final Status status) { + PerfMark.startTask("ServerStreamListener.closed", tag); + try { + closedInternal(status); + } finally { + PerfMark.stopTask("ServerStreamListener.closed", tag); + } + } + + private void closedInternal(final Status status) { // For cancellations, promptly inform any users of the context that their work should be // aborted. Otherwise, we can wait until pending work is done. if (!status.isOk()) { @@ -782,6 +843,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume // is not serializing. cancelExecutor.execute(new ContextCloser(context, status.getCause())); } + final Link link = PerfMark.link(); final class Closed extends ContextRunnable { Closed() { @@ -790,7 +852,13 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - getListener().closed(status); + PerfMark.startTask("ServerCallListener(app).closed", tag); + link.link(); + try { + getListener().closed(status); + } finally { + PerfMark.stopTask("ServerCallListener(app).closed", tag); + } } } @@ -799,6 +867,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void onReady() { + PerfMark.startTask("ServerStreamListener.onReady", tag); + final Link link = PerfMark.link(); final class OnReady extends ContextRunnable { OnReady() { super(context); @@ -806,6 +876,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { + PerfMark.startTask("ServerCallListener(app).onReady", tag); + link.link(); try { getListener().onReady(); } catch (RuntimeException e) { @@ -814,11 +886,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; + } finally { + PerfMark.stopTask("ServerCallListener(app).onReady", tag); } } } - callExecutor.execute(new OnReady()); + try { + callExecutor.execute(new OnReady()); + } finally { + PerfMark.stopTask("ServerStreamListener.onReady", tag); + } } } diff --git a/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java b/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java deleted file mode 100644 index 61a884380d..0000000000 --- a/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - - -/** - * Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use - * by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you - * *really* think you need to use this, contact the gRPC team first. - */ -public final class InternalPerfMark { - - private InternalPerfMark() {} - - /** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */ - public abstract static class InternalPerfMarkTask extends PerfMarkTask { - public InternalPerfMarkTask() {} - } - - /** Expose methods that create PerfTag to packages that utilize PerfMark. */ - private static final long NULL_NUMERIC_TAG = 0; - private static final String NULL_STRING_TAG = null; - - public static PerfTag createPerfTag(long numericTag, String stringTag) { - return PerfTag.TagFactory.create(numericTag, stringTag); - } - - public static PerfTag createPerfTag(String stringTag) { - return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag); - } - - public static PerfTag createPerfTag(long numericTag) { - return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG); - } -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMark.java b/core/src/main/java/io/grpc/perfmark/PerfMark.java deleted file mode 100644 index f30363e7d4..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMark.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import com.google.errorprone.annotations.CompileTimeConstant; -import io.grpc.perfmark.PerfTag.TagFactory; - -/** - * PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This - * class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet. - */ -public final class PerfMark { - private PerfMark() { - throw new AssertionError("nope"); - } - - /** - * Start a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #taskStart(String)} or {@link PerfTag#create(String)}. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {} - - /** - * Start a Task; a task represents some work that spans some time, and you are interested in both - * its start time and end time. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static void taskStart(@CompileTimeConstant String taskName) {} - - /** - * End a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param tag a Tag object associated with the task start. This should be the tag used for the - * corresponding {@link #taskStart(PerfTag, String)} call. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. This should - * be the name used by the corresponding {@link #taskStart(PerfTag, String)} call. - */ - public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {} - - /** - * End a Task with a Tag to identify it; a task represents some work that spans some time, and - * you are interested in both its start time and end time. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. This should - * be the name used by the corresponding {@link #taskStart(String)} call. - */ - public static void taskEnd(@CompileTimeConstant String taskName) {} - - /** - * Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some - * work that spans some time, and you are interested in both its start time and end time. - * - *

Use this in a try-with-resource statement so that task will end automatically. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #task(String)} or {@link PerfTag#create(String)}. - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) { - return NoopTask.INSTANCE; - } - - /** - * Start a Task it in a try-with-resource statement; a task represents some work that spans some - * time, and you are interested in both its start time and end time. - * - *

Use this in a try-with-resource statement so that task will end automatically. - * - * @param taskName The name of the task. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this task. - */ - public static PerfMarkTask task(@CompileTimeConstant String taskName) { - return NoopTask.INSTANCE; - } - - /** - * Records an Event with a Tag to identify it. - * - *

An Event is different from a Task in that you don't care how much time it spanned. You are - * interested in only the time it happened. - * - * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't - * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task - * does not have a numeric tag associated. In this case, you are encouraged to use {@link - * #event(String)} or {@link PerfTag#create(String)}. - * @param eventName The name of the event. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this event. - */ - public static void event(PerfTag tag, @CompileTimeConstant String eventName) {} - - /** - * Records an Event. - * - *

An Event is different from a Task in that you don't care how much time it spanned. You are - * interested in only the time it happened. - * - * @param eventName The name of the event. This parameter must be a compile-time constant! - * Otherwise, instrumentation result will show "(invalid name)" for this event. - */ - public static void event(@CompileTimeConstant String eventName) {} - - /** - * If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(long, String)} if PerfMark agent is enabled. - * - */ - public static PerfTag createTag( - @SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - /** - * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(String)} if PerfMark agent is enabled. - */ - public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - /** - * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement - * for {@link TagFactory#create(long)} if PerfMark agent is enabled. - */ - public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) { - // Warning suppression is safe as this method returns by default the NULL_PERF_TAG - return NULL_PERF_TAG; - } - - private static final PerfTag NULL_PERF_TAG = TagFactory.create(); - - private static final class NoopTask extends PerfMarkTask { - - private static final PerfMarkTask INSTANCE = new NoopTask(); - - NoopTask() {} - - @Override - public void close() {} - } -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java b/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java deleted file mode 100644 index 4a689d8ab1..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import java.io.Closeable; - -/** - * This class exists to make it easier for users to use the try-with-resource shorthand for - * starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and - * {@link io.grpc.ExperimentalApi}. Do not use this yet. - */ -public abstract class PerfMarkTask implements Closeable { - @Override - public abstract void close(); - - PerfMarkTask() {} -} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarker.java b/core/src/main/java/io/grpc/perfmark/PerfMarker.java deleted file mode 100644 index b39b85f526..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfMarker.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotation to add PerfMark instrumentation points surrounding method invocation. - * - *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this - * yet. - */ -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -// TODO(carl-mastrangelo): Add this line back in and make it okay on Android -//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE}) -public @interface PerfMarker { - - /** - * The name of the task; e.g. `parseHeaders`. - */ - String taskName(); - - /** - * An optional computed tag. - * - *

There are 3 supported references that can be used - *

    - *
  • {@code "this"}: Then the tag will be the {@link Object#toString} of the current class. - * Only valid for instance methods. - *
  • {@code "someFieldName"}: Then the tag will be the result of - * calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or - * and array type. (Though we may revisit this in the future). - *
  • {@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)} - * on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the - * second parameter. The referenced parameter cannot be a primitive or an array type. - * (Though we may revisit this in the future). - *
- * - *

In general you should reference either {@code "this"} or {@code final} fields since - * in these cases we can cache the operations to decrease the cost of computing the tags. A side - * effect of this is that for such references we will not have their tags recalculated after the - * first time. Thus it is best to use immutable objects for tags. - */ - String computedTag() default ""; - - /** - * True if class with annotation is immutable and instrumentation must adhere to this restriction. - * If enableSampling is passed as argument to the agent, instrumentation points with - * immutable = true are ignored. - */ - boolean immutable() default false; -} - diff --git a/core/src/main/java/io/grpc/perfmark/PerfTag.java b/core/src/main/java/io/grpc/perfmark/PerfTag.java deleted file mode 100644 index 3dfe6e3386..0000000000 --- a/core/src/main/java/io/grpc/perfmark/PerfTag.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.perfmark; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; - -/** - * A Tag is used to provide additional information to identify a task and consists of a 64-bit - * integer value and a string. - * - *

Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag} - * value can be used to identify the specific task being worked on (e.g. the id of the rpc call). - * The {@code stringTag} can be used to store any value that is not a compile-time constant (a - * restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the - * {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend - * to specify null. In that case you are encouraged to use {@link #create(String)}. - * - *

Invocations to {@code create} methods in this class are a no-op unless PerfMark - * instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for - * calls to {@link TagFactory} create methods. - * - *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this - * yet. - */ -@Immutable -public final class PerfTag { - - private static final long NULL_NUMERIC_TAG = 0; - private static final String NULL_STRING_TAG = null; - - private final long numericTag; - private final String stringTag; - - private PerfTag(long numericTag, @Nullable String stringTag) { - this.numericTag = numericTag; - this.stringTag = stringTag; - } - - /** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */ - public long getNumericTag() { - return numericTag; - } - - /** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */ - @Nullable public String getStringTag() { - return stringTag; - } - - @Override - public String toString() { - return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')"; - } - - @Override - public int hashCode() { - int longHashCode = (int)(numericTag ^ (numericTag >>> 32)); - return longHashCode + (stringTag != null ? stringTag.hashCode() : 31); - } - - @Override - @SuppressWarnings("ReferenceEquality") // No Java 8 yet. - public boolean equals(Object obj) { - if (!(obj instanceof PerfTag)) { - return false; - } - PerfTag that = (PerfTag) obj; - return numericTag == that.numericTag - && (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag))); - } - - /** - * Provides methods that create Tag instances which should not be directly invoked by clients. - * - *

Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link - * PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode - * rewriting, if enabled. - */ - static final class TagFactory { - /** - * This class should not be instantiated. - */ - private TagFactory() { - throw new AssertionError("nope"); - } - - public static PerfTag create(long numericTag, String stringTag) { - return new PerfTag(numericTag, stringTag); - } - - public static PerfTag create(String stringTag) { - return new PerfTag(NULL_NUMERIC_TAG, stringTag); - } - - public static PerfTag create(long numericTag) { - return new PerfTag(numericTag, NULL_STRING_TAG); - } - - static PerfTag create() { - return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG); - } - } -} - diff --git a/core/src/main/java/io/grpc/perfmark/package-info.java b/core/src/main/java/io/grpc/perfmark/package-info.java deleted file mode 100644 index 1f0b4dab70..0000000000 --- a/core/src/main/java/io/grpc/perfmark/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2019 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This is an internal, experimental API and not subject to the normal compatibility guarantees. - * - * @see io.grpc.Internal - */ -@javax.annotation.CheckReturnValue -@javax.annotation.ParametersAreNonnullByDefault -package io.grpc.perfmark; diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index ea34d4ce35..a4c0da2d69 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -43,6 +43,7 @@ import io.grpc.ServerCall; import io.grpc.Status; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.grpc.internal.testing.SingleMessageProducer; +import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.InputStreamReader; @@ -90,7 +91,7 @@ public class ServerCallImplTest { context = Context.ROOT.withCancellation(); call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, PerfMark.createTag()); } @Test @@ -113,7 +114,7 @@ public class ServerCallImplTest { call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - tracer); + tracer, PerfMark.createTag()); // required boilerplate call.sendHeaders(new Metadata()); @@ -224,7 +225,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); @@ -258,7 +260,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); serverCall.sendMessage(1L); @@ -295,7 +298,8 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer); + serverCallTracer, + PerfMark.createTag()); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(stream, times(1)).cancel(statusCaptor.capture()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 23042890a2..ea6c43a264 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,6 +77,7 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.util.MutableHandlerRegistry; +import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -1140,7 +1141,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1165,7 +1167,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1190,7 +1193,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1213,7 +1217,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1236,7 +1241,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1259,7 +1265,8 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation()); + Context.ROOT.withCancellation(), + PerfMark.createTag()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); diff --git a/netty/BUILD.bazel b/netty/BUILD.bazel index 30f02bd985..9680660067 100644 --- a/netty/BUILD.bazel +++ b/netty/BUILD.bazel @@ -25,5 +25,6 @@ java_library( "@io_netty_netty_handler_proxy//jar", "@io_netty_netty_resolver//jar", "@io_netty_netty_transport//jar", + "@io_perfmark_perfmark_api//jar", ], ) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index ba49010c6b..0949faa981 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -43,6 +43,7 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; +import io.perfmark.PerfMark; import javax.annotation.Nullable; /** @@ -114,8 +115,18 @@ class NettyClientStream extends AbstractClientStream { } private class Sink implements AbstractClientStream.Sink { + @Override public void writeHeaders(Metadata headers, byte[] requestPayload) { + PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); + try { + writeHeadersInternal(headers, requestPayload); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.writeHeaders"); + } + } + + private void writeHeadersInternal(Metadata headers, byte[] requestPayload) { // Convert the headers into Netty HTTP/2 headers. AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method); if (defaultPath == null) { @@ -152,15 +163,13 @@ class NettyClientStream extends AbstractClientStream { } } }; - // Write the command requesting the creation of the stream. writeQueue.enqueue( new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), !method.getType().clientSendsOneMessage() || get).addListener(failureListener); } - @Override - public void writeFrame( + private void writeFrameInternal( WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); @@ -184,12 +193,23 @@ class NettyClientStream extends AbstractClientStream { }); } else { // The frame is empty and will not impact outbound flow control. Just send it. - writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); + writeQueue.enqueue( + new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); } } @Override - public void request(final int numMessages) { + public void writeFrame( + WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { + PerfMark.startTask("NettyClientStream$Sink.writeFrame"); + try { + writeFrameInternal(frame, endOfStream, flush, numMessages); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.writeFrame"); + } + } + + private void requestInternal(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -203,9 +223,24 @@ class NettyClientStream extends AbstractClientStream { } } + @Override + public void request(int numMessages) { + PerfMark.startTask("NettyClientStream$Sink.request"); + try { + requestInternal(numMessages); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.request"); + } + } + @Override public void cancel(Status status) { - writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); + PerfMark.startTask("NettyClientStream$Sink.cancel"); + try { + writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); + } finally { + PerfMark.stopTask("NettyClientStream$Sink.cancel"); + } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index e9d221aa28..64ab6a3bda 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -33,6 +33,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.perfmark.PerfMark; import java.util.logging.Level; import java.util.logging.Logger; @@ -91,8 +92,8 @@ class NettyServerStream extends AbstractServerStream { } private class Sink implements AbstractServerStream.Sink { - @Override - public void request(final int numMessages) { + + private void requestInternal(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -107,16 +108,30 @@ class NettyServerStream extends AbstractServerStream { } @Override - public void writeHeaders(Metadata headers) { - writeQueue.enqueue( - SendResponseHeadersCommand.createHeaders( - transportState(), - Utils.convertServerHeaders(headers)), - true); + public void request(final int numMessages) { + PerfMark.startTask("NettyServerStream$Sink.request"); + try { + requestInternal(numMessages); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.request"); + } } @Override - public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { + public void writeHeaders(Metadata headers) { + PerfMark.startTask("NettyServerStream$Sink.writeHeaders"); + try { + writeQueue.enqueue( + SendResponseHeadersCommand.createHeaders( + transportState(), + Utils.convertServerHeaders(headers)), + true); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeHeaders"); + } + } + + private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); if (frame == null) { writeQueue.scheduleFlush(); @@ -140,17 +155,37 @@ class NettyServerStream extends AbstractServerStream { }); } + @Override + public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { + PerfMark.startTask("NettyServerStream$Sink.writeFrame"); + try { + writeFrameInternal(frame, flush, numMessages); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeFrame"); + } + } + @Override public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { - Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); - writeQueue.enqueue( - SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), - true); + PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); + try { + Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); + writeQueue.enqueue( + SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), + true); + } finally { + PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); + } } @Override public void cancel(Status status) { - writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); + PerfMark.startTask("NettyServerStream$Sink.cancel"); + try { + writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); + } finally { + PerfMark.startTask("NettyServerStream$Sink.cancel"); + } } } diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 71e6bcfe29..4a343e8d6e 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -21,6 +21,8 @@ import io.netty.buffer.ByteBufHolder; import io.netty.buffer.DefaultByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; +import io.perfmark.Link; +import io.perfmark.PerfMark; /** * Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint. @@ -28,6 +30,7 @@ import io.netty.channel.ChannelPromise; final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand { private final StreamIdHolder stream; private final boolean endStream; + private final Link link; private ChannelPromise promise; @@ -35,6 +38,12 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu super(content); this.stream = stream; this.endStream = endStream; + this.link = PerfMark.link(); + } + + @Override + public Link getLink() { + return link; } int streamId() { diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index 24063ad181..f3ef7ce0e1 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -22,6 +22,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.perfmark.Link; +import io.perfmark.PerfMark; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -104,26 +106,44 @@ class WriteQueue { * called in the event loop */ private void flush() { + PerfMark.startTask("WriteQueue.periodicFlush"); try { QueuedCommand cmd; int i = 0; boolean flushedOnce = false; while ((cmd = queue.poll()) != null) { - cmd.run(channel); + PerfMark.startTask("WriteQueue.run"); + try { + cmd.getLink().link(); + cmd.run(channel); + } finally { + PerfMark.stopTask("WriteQueue.run"); + } if (++i == DEQUE_CHUNK_SIZE) { i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop // might never end as new events are continuously added to the queue, if we never // flushed in that case we would be guaranteed to OOM. - channel.flush(); + PerfMark.startTask("WriteQueue.flush0"); + try { + channel.flush(); + } finally { + PerfMark.stopTask("WriteQueue.flush0"); + } flushedOnce = true; } } // Must flush at least once, even if there were no writes. if (i != 0 || !flushedOnce) { - channel.flush(); + PerfMark.startTask("WriteQueue.flush1"); + try { + channel.flush(); + } finally { + PerfMark.stopTask("WriteQueue.flush1"); + } } } finally { + PerfMark.stopTask("WriteQueue.periodicFlush"); // Mark the write as done, if the queue is non-empty after marking trigger a new write. scheduled.set(false); if (!queue.isEmpty()) { @@ -134,8 +154,10 @@ class WriteQueue { private static class RunnableCommand implements QueuedCommand { private final Runnable runnable; + private final Link link; public RunnableCommand(Runnable runnable) { + this.link = PerfMark.link(); this.runnable = runnable; } @@ -153,11 +175,21 @@ class WriteQueue { public final void run(Channel channel) { runnable.run(); } + + @Override + public Link getLink() { + return link; + } } abstract static class AbstractQueuedCommand implements QueuedCommand { private ChannelPromise promise; + private final Link link; + + AbstractQueuedCommand() { + this.link = PerfMark.link(); + } @Override public final void promise(ChannelPromise promise) { @@ -173,6 +205,11 @@ class WriteQueue { public final void run(Channel channel) { channel.write(this, promise); } + + @Override + public Link getLink() { + return link; + } } /** @@ -190,5 +227,7 @@ class WriteQueue { void promise(ChannelPromise promise); void run(Channel channel); + + Link getLink(); } } diff --git a/repositories.bzl b/repositories.bzl index 4fd98cec56..c77df33038 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -35,6 +35,7 @@ def grpc_java_repositories( omit_io_netty_tcnative_boringssl_static = False, omit_io_opencensus_api = False, omit_io_opencensus_grpc_metrics = False, + omit_io_perfmark = False, omit_javax_annotation = False, omit_junit_junit = False, omit_net_zlib = False, @@ -103,6 +104,8 @@ def grpc_java_repositories( io_opencensus_api() if not omit_io_opencensus_grpc_metrics: io_opencensus_grpc_metrics() + if not omit_io_perfmark: + io_perfmark() if not omit_javax_annotation: javax_annotation() if not omit_junit_junit: @@ -402,6 +405,15 @@ def io_opencensus_grpc_metrics(): licenses = ["notice"], # Apache 2.0 ) +def io_perfmark(): + jvm_maven_import_external( + name = "io_perfmark_perfmark_api", + artifact = "io.perfmark:perfmark-api:0.16.0", + server_urls = ["http://central.maven.org/maven2"], + artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0", + licenses = ["notice"], # Apache 2.0 + ) + def javax_annotation(): # Use //stub:javax_annotation for neverlink=1 support. jvm_maven_import_external(