From 2db3abc9adb76f5a249dd9f503ca9fa3f5ff2cb9 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 7 Jun 2019 17:23:49 -0700 Subject: [PATCH] Revert "core, netty: add io.perfmark Annotations" (#5853) This causes internal breakage which needs to be resolved before continuing. This reverts commit 71967622d635160172e0739541f88197f9da70a2. --- build.gradle | 1 - core/BUILD.bazel | 24 ++- core/build.gradle | 1 - .../java/io/grpc/internal/ClientCallImpl.java | 139 ++++---------- .../java/io/grpc/internal/ServerCallImpl.java | 98 +++++----- .../java/io/grpc/internal/ServerImpl.java | 106 ++--------- .../io/grpc/perfmark/InternalPerfMark.java | 49 +++++ .../main/java/io/grpc/perfmark/PerfMark.java | 171 ++++++++++++++++++ .../java/io/grpc/perfmark/PerfMarkTask.java | 31 ++++ .../java/io/grpc/perfmark/PerfMarker.java | 71 ++++++++ .../main/java/io/grpc/perfmark/PerfTag.java | 118 ++++++++++++ .../java/io/grpc/perfmark/package-info.java | 24 +++ .../io/grpc/internal/ServerCallImplTest.java | 14 +- .../java/io/grpc/internal/ServerImplTest.java | 19 +- netty/BUILD.bazel | 1 - .../java/io/grpc/netty/NettyClientStream.java | 47 +---- .../java/io/grpc/netty/NettyServerStream.java | 63 ++----- .../io/grpc/netty/SendGrpcFrameCommand.java | 9 - .../main/java/io/grpc/netty/WriteQueue.java | 45 +---- repositories.bzl | 12 -- 20 files changed, 615 insertions(+), 428 deletions(-) create mode 100644 core/src/main/java/io/grpc/perfmark/InternalPerfMark.java create mode 100644 core/src/main/java/io/grpc/perfmark/PerfMark.java create mode 100644 core/src/main/java/io/grpc/perfmark/PerfMarkTask.java create mode 100644 core/src/main/java/io/grpc/perfmark/PerfMarker.java create mode 100644 core/src/main/java/io/grpc/perfmark/PerfTag.java create mode 100644 core/src/main/java/io/grpc/perfmark/package-info.java diff --git a/build.gradle b/build.gradle index 188c5768cc..b7b68aa611 100644 --- a/build.gradle +++ b/build.gradle @@ -201,7 +201,6 @@ subprojects { opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}", opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}", instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', - perfmark: 'io.perfmark:perfmark-api:0.16.0', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0", diff --git a/core/BUILD.bazel b/core/BUILD.bazel index 4374e7b7d9..3a22b50aac 100644 --- a/core/BUILD.bazel +++ b/core/BUILD.bazel @@ -1,3 +1,16 @@ +PERFMARK_INTERNAL_ACCESSOR_SRCS = glob( + [ + "src/main/java/io/grpc/perfmark/Internal*.java", + ], +) + +PERFMARK_SRCS = glob( + [ + "src/main/java/io/grpc/perfmark/*.java", + ], + exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS, +) + java_library( name = "core", visibility = ["//visibility:public"], @@ -30,6 +43,7 @@ java_library( ]), visibility = ["//:__subpackages__"], deps = [ + ":perfmark", "//api", "//context", "@com_google_android_annotations//jar", @@ -40,7 +54,6 @@ java_library( "@com_google_j2objc_j2objc_annotations//jar", "@io_opencensus_opencensus_api//jar", "@io_opencensus_opencensus_contrib_grpc_metrics//jar", - "@io_perfmark_perfmark_api//jar", "@org_codehaus_mojo_animal_sniffer_annotations//jar", ], ) @@ -63,3 +76,12 @@ java_library( ], ) +java_library( + name = "perfmark", + srcs = PERFMARK_SRCS, + visibility = ["//:__subpackages__"], + deps = [ + "@com_google_code_findbugs_jsr305//jar", + "@com_google_errorprone_error_prone_annotations//jar", + ], +) diff --git a/core/build.gradle b/core/build.gradle index e57eb34446..c822177932 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -4,7 +4,6 @@ dependencies { compile project(':grpc-api'), libraries.gson, libraries.android_annotations, - libraries.perfmark compile (libraries.opencensus_api) { // prefer 3.0.2 from libraries instead of 3.0.1 exclude group: 'com.google.code.findbugs', module: 'jsr305' diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 976c7c24cf..ff9e930d81 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -46,9 +46,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; -import io.perfmark.Link; -import io.perfmark.PerfMark; -import io.perfmark.Tag; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.nio.charset.Charset; import java.util.concurrent.CancellationException; @@ -70,7 +69,7 @@ final class ClientCallImpl extends ClientCall { = "gzip".getBytes(Charset.forName("US-ASCII")); private final MethodDescriptor method; - private final Tag tag; + private final PerfTag tag; private final Executor callExecutor; private final CallTracer channelCallsTracer; private final Context context; @@ -97,7 +96,7 @@ final class ClientCallImpl extends ClientCall { boolean retryEnabled) { this.method = method; // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. - this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); + this.tag = PerfMark.createTag(method.getFullMethodName()); // If we know that the executor is a direct executor, we don't need to wrap it with a // SerializingExecutor. This is purely for performance reasons. // See https://github.com/grpc/grpc-java/issues/368 @@ -113,7 +112,6 @@ final class ClientCallImpl extends ClientCall { this.clientTransportProvider = clientTransportProvider; this.deadlineCancellationExecutor = deadlineCancellationExecutor; this.retryEnabled = retryEnabled; - PerfMark.event("ClientCall.", tag); } private final class ContextCancellationListener implements CancellationListener { @@ -185,11 +183,11 @@ final class ClientCallImpl extends ClientCall { @Override public void start(Listener observer, Metadata headers) { - PerfMark.startTask("ClientCall.start", tag); + PerfMark.taskStart(tag, "ClientCall.start"); try { startInternal(observer, headers); } finally { - PerfMark.stopTask("ClientCall.start", tag); + PerfMark.taskEnd(tag, "ClientCall.start"); } } @@ -380,23 +378,18 @@ final class ClientCallImpl extends ClientCall { @Override public void request(int numMessages) { - PerfMark.startTask("ClientCall.request", tag); - try { - checkState(stream != null, "Not started"); - checkArgument(numMessages >= 0, "Number requested must be non-negative"); - stream.request(numMessages); - } finally { - PerfMark.stopTask("ClientCall.cancel", tag); - } + checkState(stream != null, "Not started"); + checkArgument(numMessages >= 0, "Number requested must be non-negative"); + stream.request(numMessages); } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - PerfMark.startTask("ClientCall.cancel", tag); + PerfMark.taskStart(tag, "ClientCall.cancel"); try { cancelInternal(message, cause); } finally { - PerfMark.stopTask("ClientCall.cancel", tag); + PerfMark.taskEnd(tag, "ClientCall.cancel"); } } @@ -431,11 +424,11 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { - PerfMark.startTask("ClientCall.halfClose", tag); + PerfMark.taskStart(tag, "ClientCall.halfClose"); try { halfCloseInternal(); } finally { - PerfMark.stopTask("ClientCall.halfClose", tag); + PerfMark.taskEnd(tag, "ClientCall.halfClose"); } } @@ -449,11 +442,11 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { - PerfMark.startTask("ClientCall.sendMessage", tag); + PerfMark.taskStart(tag, "ClientCall.sendMessage"); try { sendMessageInternal(message); } finally { - PerfMark.stopTask("ClientCall.sendMessage", tag); + PerfMark.taskEnd(tag, "ClientCall.sendMessage"); } } @@ -522,29 +515,17 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { - PerfMark.startTask("ClientStreamListener.headersRead", tag); - final Link link = PerfMark.link(); - final class HeadersRead extends ContextRunnable { HeadersRead() { super(context); } @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.headersRead", tag); - link.link(); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.headersRead", tag); - } - } - - private void runInternal() { + public final void runInContext() { if (closed) { return; } + PerfMark.taskStart(tag, "ClientCall.headersRead"); try { observer.onHeaders(headers); } catch (Throwable t) { @@ -552,43 +533,29 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read headers"); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(tag, "ClientCall.headersRead"); } } } - try { - callExecutor.execute(new HeadersRead()); - } finally { - PerfMark.stopTask("ClientStreamListener.headersRead", tag); - } + callExecutor.execute(new HeadersRead()); } @Override public void messagesAvailable(final MessageProducer producer) { - PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); - final Link link = PerfMark.link(); - final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super(context); } @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag); - link.link(); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag); - } - } - - private void runInternal() { + public final void runInContext() { if (closed) { GrpcUtil.closeQuietly(producer); return; } + PerfMark.taskStart(tag, "ClientCall.messagesAvailable"); try { InputStream message; while ((message = producer.next()) != null) { @@ -606,15 +573,13 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to read message."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(tag, "ClientCall.messagesAvailable"); } } } - try { - callExecutor.execute(new MessagesAvailable()); - } finally { - PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); - } + callExecutor.execute(new MessagesAvailable()); } /** @@ -638,16 +603,6 @@ final class ClientCallImpl extends ClientCall { @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { - PerfMark.startTask("ClientStreamListener.closed", tag); - try { - closedInternal(status, rpcProgress, trailers); - } finally { - PerfMark.stopTask("ClientStreamListener.closed", tag); - } - } - - private void closedInternal( - Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) { Deadline deadline = effectiveDeadline(); if (status.getCode() == Status.Code.CANCELLED && deadline != null) { // When the server's deadline expires, it can only reset the stream with CANCEL and no @@ -661,29 +616,23 @@ final class ClientCallImpl extends ClientCall { } final Status savedStatus = status; final Metadata savedTrailers = trailers; - final Link link = PerfMark.link(); final class StreamClosed extends ContextRunnable { StreamClosed() { super(context); } @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.onClose", tag); - link.link(); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.onClose", tag); - } - } - - private void runInternal() { + public final void runInContext() { if (closed) { // We intentionally don't keep the status or metadata from the server. return; } - close(savedStatus, savedTrailers); + PerfMark.taskStart(tag, "ClientCall.closed"); + try { + close(savedStatus, savedTrailers); + } finally { + PerfMark.taskEnd(tag, "ClientCall.closed"); + } } } @@ -692,26 +641,14 @@ final class ClientCallImpl extends ClientCall { @Override public void onReady() { - PerfMark.startTask("ClientStreamListener.onReady", tag); - final Link link = PerfMark.link(); - final class StreamOnReady extends ContextRunnable { StreamOnReady() { super(context); } @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.onReady", tag); - link.link(); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.onReady", tag); - } - } - - private void runInternal() { + public final void runInContext() { + PerfMark.taskStart(tag, "ClientCall.onReady"); try { observer.onReady(); } catch (Throwable t) { @@ -719,15 +656,13 @@ final class ClientCallImpl extends ClientCall { Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."); stream.cancel(status); close(status, new Metadata()); + } finally { + PerfMark.taskEnd(tag, "ClientCall.onReady"); } } } - try { - callExecutor.execute(new StreamOnReady()); - } finally { - PerfMark.stopTask("ClientStreamListener.onReady", tag); - } + callExecutor.execute(new StreamOnReady()); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 6f123e7667..ec50c76414 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -37,8 +37,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.Status; -import io.perfmark.PerfMark; -import io.perfmark.Tag; +import io.grpc.perfmark.PerfMark; +import io.grpc.perfmark.PerfTag; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,7 +54,7 @@ final class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; - private final Tag tag; + private final PerfTag tag; private final Context.CancellableContext context; private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; @@ -71,35 +71,31 @@ final class ServerCallImpl extends ServerCall { ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, - CallTracer serverCallTracer, Tag tag) { + CallTracer serverCallTracer) { this.stream = stream; this.method = method; + // TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall. + this.tag = PerfMark.createTag(method.getFullMethodName()); this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; this.compressorRegistry = compressorRegistry; this.serverCallTracer = serverCallTracer; this.serverCallTracer.reportCallStarted(); - this.tag = tag; } @Override public void request(int numMessages) { - PerfMark.startTask("ServerCall.request", tag); - try { - stream.request(numMessages); - } finally { - PerfMark.stopTask("ServerCall.request", tag); - } + stream.request(numMessages); } @Override public void sendHeaders(Metadata headers) { - PerfMark.startTask("ServerCall.sendHeaders", tag); + PerfMark.taskStart(tag, "ServerCall.sendHeaders"); try { sendHeadersInternal(headers); } finally { - PerfMark.stopTask("ServerCall.sendHeaders", tag); + PerfMark.taskEnd(tag, "ServerCall.sendHeaders"); } } @@ -144,11 +140,11 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { - PerfMark.startTask("ServerCall.sendMessage", tag); + PerfMark.taskStart(tag, "ServerCall.sendMessage"); try { sendMessageInternal(message); } finally { - PerfMark.stopTask("ServerCall.sendMessage", tag); + PerfMark.taskEnd(tag, "ServerCall.sendMessage"); } } @@ -197,11 +193,11 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { - PerfMark.startTask("ServerCall.close", tag); + PerfMark.taskStart(tag, "ServerCall.close"); try { closeInternal(status, trailers); } finally { - PerfMark.stopTask("ServerCall.close", tag); + PerfMark.taskEnd(tag, "ServerCall.close"); } } @@ -285,23 +281,15 @@ final class ServerCallImpl extends ServerCall { MoreExecutors.directExecutor()); } - @Override - public void messagesAvailable(MessageProducer producer) { - PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag); - try { - messagesAvailableInternal(producer); - } finally { - PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag); - } - } - @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try - private void messagesAvailableInternal(final MessageProducer producer) { + @Override + public void messagesAvailable(final MessageProducer producer) { if (call.cancelled) { GrpcUtil.closeQuietly(producer); return; } + PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable"); InputStream message; try { while ((message = producer.next()) != null) { @@ -317,58 +305,58 @@ final class ServerCallImpl extends ServerCall { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); throw new RuntimeException(t); + } finally { + PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable"); } } @Override public void halfClosed() { - PerfMark.startTask("ServerStreamListener.halfClosed", call.tag); - try { - if (call.cancelled) { - return; - } + if (call.cancelled) { + return; + } + PerfMark.taskStart(call.tag, "ServerCall.halfClosed"); + + try { listener.onHalfClose(); } finally { - PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag); + PerfMark.taskEnd(call.tag, "ServerCall.halfClosed"); } } @Override public void closed(Status status) { - PerfMark.startTask("ServerStreamListener.closed", call.tag); + PerfMark.taskStart(call.tag, "ServerCall.closed"); try { - closedInternal(status); - } finally { - PerfMark.stopTask("ServerStreamListener.closed", call.tag); - } - } + try { + if (status.isOk()) { + listener.onComplete(); + } else { + call.cancelled = true; + listener.onCancel(); + } + } finally { + // Cancel context after delivering RPC closure notification to allow the application to + // clean up and update any state based on whether onComplete or onCancel was called. + context.cancel(null); - private void closedInternal(Status status) { - try { - if (status.isOk()) { - listener.onComplete(); - } else { - call.cancelled = true; - listener.onCancel(); } } finally { - // Cancel context after delivering RPC closure notification to allow the application to - // clean up and update any state based on whether onComplete or onCancel was called. - context.cancel(null); + PerfMark.taskEnd(call.tag, "ServerCall.closed"); } } @Override public void onReady() { - PerfMark.startTask("ServerStreamListener.onReady", call.tag); + if (call.cancelled) { + return; + } + PerfMark.taskStart(call.tag, "ServerCall.closed"); try { - if (call.cancelled) { - return; - } listener.onReady(); } finally { - PerfMark.stopTask("ServerCall.closed", call.tag); + PerfMark.taskEnd(call.tag, "ServerCall.closed"); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 886fa0ff7e..4e4c3e1437 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -50,9 +50,6 @@ import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; -import io.perfmark.Link; -import io.perfmark.PerfMark; -import io.perfmark.Tag; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -463,21 +460,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume transportClosed(transport); } - @Override - public void streamCreated(ServerStream stream, String methodName, Metadata headers) { - Tag tag = PerfMark.createTag(methodName, stream.hashCode()); - PerfMark.startTask("ServerTransportListener.streamCreated", tag); - try { - streamCreatedInternal(stream, methodName, headers, tag); - } finally { - PerfMark.stopTask("ServerTransportListener.streamCreated", tag); - } - } - - private void streamCreatedInternal( - final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { - + public void streamCreated( + final ServerStream stream, final String methodName, final Metadata headers) { if (headers.containsKey(MESSAGE_ENCODING_KEY)) { String encoding = headers.get(MESSAGE_ENCODING_KEY); Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding); @@ -504,33 +489,22 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume wrappedExecutor = new SerializingExecutor(executor); } - final Link link = PerfMark.link(); - final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener( - wrappedExecutor, executor, stream, context, tag); + wrappedExecutor, executor, stream, context); stream.setListener(jumpListener); // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks // are delivered, including any errors. Callbacks can still be triggered, but they will be // queued. final class StreamCreated extends ContextRunnable { + StreamCreated() { super(context); } @Override public void runInContext() { - PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag); - link.link(); - try { - runInternal(); - } finally { - PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag); - } - } - - private void runInternal() { ServerStreamListener listener = NOOP_LISTENER; try { ServerMethodDefinition method = registry.lookupMethod(methodName); @@ -549,8 +523,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context.cancel(null); return; } - listener = - startCall(stream, methodName, method, headers, context, statsTraceCtx, tag); + listener = startCall(stream, methodName, method, headers, context, statsTraceCtx); } catch (RuntimeException e) { stream.close(Status.fromThrowable(e), new Metadata()); context.cancel(null); @@ -600,7 +573,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Never returns {@code null}. */ private ServerStreamListener startCall(ServerStream stream, String fullMethodName, ServerMethodDefinition methodDef, Metadata headers, - Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) { + Context.CancellableContext context, StatsTraceContext statsTraceCtx) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? statsTraceCtx.serverCallStarted( new ServerCallInfoImpl<>( @@ -614,7 +587,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition interceptedDef = methodDef.withServerCallHandler(handler); ServerMethodDefinition wMethodDef = binlog == null ? interceptedDef : binlog.wrapMethodDefinition(interceptedDef); - return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); + return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context); } private ServerStreamListener startWrappedCall( @@ -622,8 +595,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume ServerMethodDefinition methodDef, ServerStream stream, Metadata headers, - Context.CancellableContext context, - Tag tag) { + Context.CancellableContext context) { ServerCallImpl call = new ServerCallImpl<>( stream, @@ -632,8 +604,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume context, decompressorRegistry, compressorRegistry, - serverCallTracer, - tag); + serverCallTracer); ServerCall.Listener listener = methodDef.getServerCallHandler().startCall(call, headers); @@ -716,17 +687,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final Executor cancelExecutor; private final Context.CancellableContext context; private final ServerStream stream; - private final Tag tag; // Only accessed from callExecutor. private ServerStreamListener listener; public JumpToApplicationThreadServerStreamListener(Executor executor, - Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) { + Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) { this.callExecutor = executor; this.cancelExecutor = cancelExecutor; this.stream = stream; this.context = context; - this.tag = tag; } /** @@ -756,8 +725,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void messagesAvailable(final MessageProducer producer) { - PerfMark.startTask("ServerStreamListener.messagesAvailable", tag); - final Link link = PerfMark.link(); final class MessagesAvailable extends ContextRunnable { @@ -767,8 +734,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag); - link.link(); try { getListener().messagesAvailable(producer); } catch (RuntimeException e) { @@ -777,24 +742,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; - } finally { - PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); } } } - try { - callExecutor.execute(new MessagesAvailable()); - } finally { - PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag); - } + callExecutor.execute(new MessagesAvailable()); } @Override public void halfClosed() { - PerfMark.startTask("ServerStreamListener.halfClosed", tag); - final Link link = PerfMark.link(); - final class HalfClosed extends ContextRunnable { HalfClosed() { super(context); @@ -802,8 +758,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerCallListener(app).halfClosed", tag); - link.link(); try { getListener().halfClosed(); } catch (RuntimeException e) { @@ -812,30 +766,15 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; - } finally { - PerfMark.stopTask("ServerCallListener(app).halfClosed", tag); } } } - try { - callExecutor.execute(new HalfClosed()); - } finally { - PerfMark.stopTask("ServerStreamListener.halfClosed", tag); - } + callExecutor.execute(new HalfClosed()); } @Override public void closed(final Status status) { - PerfMark.startTask("ServerStreamListener.closed", tag); - try { - closedInternal(status); - } finally { - PerfMark.stopTask("ServerStreamListener.closed", tag); - } - } - - private void closedInternal(final Status status) { // For cancellations, promptly inform any users of the context that their work should be // aborted. Otherwise, we can wait until pending work is done. if (!status.isOk()) { @@ -843,7 +782,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume // is not serializing. cancelExecutor.execute(new ContextCloser(context, status.getCause())); } - final Link link = PerfMark.link(); final class Closed extends ContextRunnable { Closed() { @@ -852,13 +790,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerCallListener(app).closed", tag); - link.link(); - try { - getListener().closed(status); - } finally { - PerfMark.stopTask("ServerCallListener(app).closed", tag); - } + getListener().closed(status); } } @@ -867,8 +799,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void onReady() { - PerfMark.startTask("ServerStreamListener.onReady", tag); - final Link link = PerfMark.link(); final class OnReady extends ContextRunnable { OnReady() { super(context); @@ -876,8 +806,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerCallListener(app).onReady", tag); - link.link(); try { getListener().onReady(); } catch (RuntimeException e) { @@ -886,17 +814,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume } catch (Error e) { internalClose(); throw e; - } finally { - PerfMark.stopTask("ServerCallListener(app).onReady", tag); } } } - try { - callExecutor.execute(new OnReady()); - } finally { - PerfMark.stopTask("ServerStreamListener.onReady", tag); - } + callExecutor.execute(new OnReady()); } } diff --git a/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java b/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java new file mode 100644 index 0000000000..61a884380d --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/InternalPerfMark.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.perfmark; + + +/** + * Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use + * by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you + * *really* think you need to use this, contact the gRPC team first. + */ +public final class InternalPerfMark { + + private InternalPerfMark() {} + + /** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */ + public abstract static class InternalPerfMarkTask extends PerfMarkTask { + public InternalPerfMarkTask() {} + } + + /** Expose methods that create PerfTag to packages that utilize PerfMark. */ + private static final long NULL_NUMERIC_TAG = 0; + private static final String NULL_STRING_TAG = null; + + public static PerfTag createPerfTag(long numericTag, String stringTag) { + return PerfTag.TagFactory.create(numericTag, stringTag); + } + + public static PerfTag createPerfTag(String stringTag) { + return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag); + } + + public static PerfTag createPerfTag(long numericTag) { + return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG); + } +} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMark.java b/core/src/main/java/io/grpc/perfmark/PerfMark.java new file mode 100644 index 0000000000..f30363e7d4 --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/PerfMark.java @@ -0,0 +1,171 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.perfmark; + +import com.google.errorprone.annotations.CompileTimeConstant; +import io.grpc.perfmark.PerfTag.TagFactory; + +/** + * PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This + * class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet. + */ +public final class PerfMark { + private PerfMark() { + throw new AssertionError("nope"); + } + + /** + * Start a Task with a Tag to identify it; a task represents some work that spans some time, and + * you are interested in both its start time and end time. + * + * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't + * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task + * does not have a numeric tag associated. In this case, you are encouraged to use {@link + * #taskStart(String)} or {@link PerfTag#create(String)}. + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. + */ + public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {} + + /** + * Start a Task; a task represents some work that spans some time, and you are interested in both + * its start time and end time. + * + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. + */ + public static void taskStart(@CompileTimeConstant String taskName) {} + + /** + * End a Task with a Tag to identify it; a task represents some work that spans some time, and + * you are interested in both its start time and end time. + * + * @param tag a Tag object associated with the task start. This should be the tag used for the + * corresponding {@link #taskStart(PerfTag, String)} call. + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. This should + * be the name used by the corresponding {@link #taskStart(PerfTag, String)} call. + */ + public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {} + + /** + * End a Task with a Tag to identify it; a task represents some work that spans some time, and + * you are interested in both its start time and end time. + * + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. This should + * be the name used by the corresponding {@link #taskStart(String)} call. + */ + public static void taskEnd(@CompileTimeConstant String taskName) {} + + /** + * Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some + * work that spans some time, and you are interested in both its start time and end time. + * + *

Use this in a try-with-resource statement so that task will end automatically. + * + * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't + * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task + * does not have a numeric tag associated. In this case, you are encouraged to use {@link + * #task(String)} or {@link PerfTag#create(String)}. + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. + */ + public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) { + return NoopTask.INSTANCE; + } + + /** + * Start a Task it in a try-with-resource statement; a task represents some work that spans some + * time, and you are interested in both its start time and end time. + * + *

Use this in a try-with-resource statement so that task will end automatically. + * + * @param taskName The name of the task. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this task. + */ + public static PerfMarkTask task(@CompileTimeConstant String taskName) { + return NoopTask.INSTANCE; + } + + /** + * Records an Event with a Tag to identify it. + * + *

An Event is different from a Task in that you don't care how much time it spanned. You are + * interested in only the time it happened. + * + * @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't + * use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task + * does not have a numeric tag associated. In this case, you are encouraged to use {@link + * #event(String)} or {@link PerfTag#create(String)}. + * @param eventName The name of the event. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this event. + */ + public static void event(PerfTag tag, @CompileTimeConstant String eventName) {} + + /** + * Records an Event. + * + *

An Event is different from a Task in that you don't care how much time it spanned. You are + * interested in only the time it happened. + * + * @param eventName The name of the event. This parameter must be a compile-time constant! + * Otherwise, instrumentation result will show "(invalid name)" for this event. + */ + public static void event(@CompileTimeConstant String eventName) {} + + /** + * If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement + * for {@link TagFactory#create(long, String)} if PerfMark agent is enabled. + * + */ + public static PerfTag createTag( + @SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) { + // Warning suppression is safe as this method returns by default the NULL_PERF_TAG + return NULL_PERF_TAG; + } + + /** + * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement + * for {@link TagFactory#create(String)} if PerfMark agent is enabled. + */ + public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) { + // Warning suppression is safe as this method returns by default the NULL_PERF_TAG + return NULL_PERF_TAG; + } + + /** + * If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement + * for {@link TagFactory#create(long)} if PerfMark agent is enabled. + */ + public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) { + // Warning suppression is safe as this method returns by default the NULL_PERF_TAG + return NULL_PERF_TAG; + } + + private static final PerfTag NULL_PERF_TAG = TagFactory.create(); + + private static final class NoopTask extends PerfMarkTask { + + private static final PerfMarkTask INSTANCE = new NoopTask(); + + NoopTask() {} + + @Override + public void close() {} + } +} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java b/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java new file mode 100644 index 0000000000..4a689d8ab1 --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/PerfMarkTask.java @@ -0,0 +1,31 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.perfmark; + +import java.io.Closeable; + +/** + * This class exists to make it easier for users to use the try-with-resource shorthand for + * starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and + * {@link io.grpc.ExperimentalApi}. Do not use this yet. + */ +public abstract class PerfMarkTask implements Closeable { + @Override + public abstract void close(); + + PerfMarkTask() {} +} diff --git a/core/src/main/java/io/grpc/perfmark/PerfMarker.java b/core/src/main/java/io/grpc/perfmark/PerfMarker.java new file mode 100644 index 0000000000..b39b85f526 --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/PerfMarker.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.perfmark; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to add PerfMark instrumentation points surrounding method invocation. + * + *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this + * yet. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +// TODO(carl-mastrangelo): Add this line back in and make it okay on Android +//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE}) +public @interface PerfMarker { + + /** + * The name of the task; e.g. `parseHeaders`. + */ + String taskName(); + + /** + * An optional computed tag. + * + *

There are 3 supported references that can be used + *

    + *
  • {@code "this"}: Then the tag will be the {@link Object#toString} of the current class. + * Only valid for instance methods. + *
  • {@code "someFieldName"}: Then the tag will be the result of + * calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or + * and array type. (Though we may revisit this in the future). + *
  • {@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)} + * on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the + * second parameter. The referenced parameter cannot be a primitive or an array type. + * (Though we may revisit this in the future). + *
+ * + *

In general you should reference either {@code "this"} or {@code final} fields since + * in these cases we can cache the operations to decrease the cost of computing the tags. A side + * effect of this is that for such references we will not have their tags recalculated after the + * first time. Thus it is best to use immutable objects for tags. + */ + String computedTag() default ""; + + /** + * True if class with annotation is immutable and instrumentation must adhere to this restriction. + * If enableSampling is passed as argument to the agent, instrumentation points with + * immutable = true are ignored. + */ + boolean immutable() default false; +} + diff --git a/core/src/main/java/io/grpc/perfmark/PerfTag.java b/core/src/main/java/io/grpc/perfmark/PerfTag.java new file mode 100644 index 0000000000..3dfe6e3386 --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/PerfTag.java @@ -0,0 +1,118 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.perfmark; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +/** + * A Tag is used to provide additional information to identify a task and consists of a 64-bit + * integer value and a string. + * + *

Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag} + * value can be used to identify the specific task being worked on (e.g. the id of the rpc call). + * The {@code stringTag} can be used to store any value that is not a compile-time constant (a + * restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the + * {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend + * to specify null. In that case you are encouraged to use {@link #create(String)}. + * + *

Invocations to {@code create} methods in this class are a no-op unless PerfMark + * instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for + * calls to {@link TagFactory} create methods. + * + *

This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this + * yet. + */ +@Immutable +public final class PerfTag { + + private static final long NULL_NUMERIC_TAG = 0; + private static final String NULL_STRING_TAG = null; + + private final long numericTag; + private final String stringTag; + + private PerfTag(long numericTag, @Nullable String stringTag) { + this.numericTag = numericTag; + this.stringTag = stringTag; + } + + /** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */ + public long getNumericTag() { + return numericTag; + } + + /** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */ + @Nullable public String getStringTag() { + return stringTag; + } + + @Override + public String toString() { + return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')"; + } + + @Override + public int hashCode() { + int longHashCode = (int)(numericTag ^ (numericTag >>> 32)); + return longHashCode + (stringTag != null ? stringTag.hashCode() : 31); + } + + @Override + @SuppressWarnings("ReferenceEquality") // No Java 8 yet. + public boolean equals(Object obj) { + if (!(obj instanceof PerfTag)) { + return false; + } + PerfTag that = (PerfTag) obj; + return numericTag == that.numericTag + && (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag))); + } + + /** + * Provides methods that create Tag instances which should not be directly invoked by clients. + * + *

Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link + * PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode + * rewriting, if enabled. + */ + static final class TagFactory { + /** + * This class should not be instantiated. + */ + private TagFactory() { + throw new AssertionError("nope"); + } + + public static PerfTag create(long numericTag, String stringTag) { + return new PerfTag(numericTag, stringTag); + } + + public static PerfTag create(String stringTag) { + return new PerfTag(NULL_NUMERIC_TAG, stringTag); + } + + public static PerfTag create(long numericTag) { + return new PerfTag(numericTag, NULL_STRING_TAG); + } + + static PerfTag create() { + return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG); + } + } +} + diff --git a/core/src/main/java/io/grpc/perfmark/package-info.java b/core/src/main/java/io/grpc/perfmark/package-info.java new file mode 100644 index 0000000000..1f0b4dab70 --- /dev/null +++ b/core/src/main/java/io/grpc/perfmark/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This is an internal, experimental API and not subject to the normal compatibility guarantees. + * + * @see io.grpc.Internal + */ +@javax.annotation.CheckReturnValue +@javax.annotation.ParametersAreNonnullByDefault +package io.grpc.perfmark; diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index a4c0da2d69..ea34d4ce35 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -43,7 +43,6 @@ import io.grpc.ServerCall; import io.grpc.Status; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.grpc.internal.testing.SingleMessageProducer; -import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.InputStreamReader; @@ -91,7 +90,7 @@ public class ServerCallImplTest { context = Context.ROOT.withCancellation(); call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, PerfMark.createTag()); + serverCallTracer); } @Test @@ -114,7 +113,7 @@ public class ServerCallImplTest { call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - tracer, PerfMark.createTag()); + tracer); // required boilerplate call.sendHeaders(new Metadata()); @@ -225,8 +224,7 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, - PerfMark.createTag()); + serverCallTracer); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); @@ -260,8 +258,7 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, - PerfMark.createTag()); + serverCallTracer); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); serverCall.sendMessage(1L); @@ -298,8 +295,7 @@ public class ServerCallImplTest { context, DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, - PerfMark.createTag()); + serverCallTracer); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(stream, times(1)).cancel(statusCaptor.capture()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ea6c43a264..23042890a2 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -77,7 +77,6 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.util.MutableHandlerRegistry; -import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; @@ -1141,8 +1140,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1167,8 +1165,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1193,8 +1190,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1217,8 +1213,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1241,8 +1236,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); @@ -1265,8 +1259,7 @@ public class ServerImplTest { executor.getScheduledExecutorService(), executor.getScheduledExecutorService(), stream, - Context.ROOT.withCancellation(), - PerfMark.createTag()); + Context.ROOT.withCancellation()); ServerStreamListener mockListener = mock(ServerStreamListener.class); listener.setListener(mockListener); diff --git a/netty/BUILD.bazel b/netty/BUILD.bazel index 9680660067..30f02bd985 100644 --- a/netty/BUILD.bazel +++ b/netty/BUILD.bazel @@ -25,6 +25,5 @@ java_library( "@io_netty_netty_handler_proxy//jar", "@io_netty_netty_resolver//jar", "@io_netty_netty_transport//jar", - "@io_perfmark_perfmark_api//jar", ], ) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0949faa981..ba49010c6b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -43,7 +43,6 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; -import io.perfmark.PerfMark; import javax.annotation.Nullable; /** @@ -115,18 +114,8 @@ class NettyClientStream extends AbstractClientStream { } private class Sink implements AbstractClientStream.Sink { - @Override public void writeHeaders(Metadata headers, byte[] requestPayload) { - PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); - try { - writeHeadersInternal(headers, requestPayload); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.writeHeaders"); - } - } - - private void writeHeadersInternal(Metadata headers, byte[] requestPayload) { // Convert the headers into Netty HTTP/2 headers. AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method); if (defaultPath == null) { @@ -163,13 +152,15 @@ class NettyClientStream extends AbstractClientStream { } } }; + // Write the command requesting the creation of the stream. writeQueue.enqueue( new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), !method.getType().clientSendsOneMessage() || get).addListener(failureListener); } - private void writeFrameInternal( + @Override + public void writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf(); @@ -193,23 +184,12 @@ class NettyClientStream extends AbstractClientStream { }); } else { // The frame is empty and will not impact outbound flow control. Just send it. - writeQueue.enqueue( - new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); + writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); } } @Override - public void writeFrame( - WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { - PerfMark.startTask("NettyClientStream$Sink.writeFrame"); - try { - writeFrameInternal(frame, endOfStream, flush, numMessages); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.writeFrame"); - } - } - - private void requestInternal(final int numMessages) { + public void request(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -223,24 +203,9 @@ class NettyClientStream extends AbstractClientStream { } } - @Override - public void request(int numMessages) { - PerfMark.startTask("NettyClientStream$Sink.request"); - try { - requestInternal(numMessages); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.request"); - } - } - @Override public void cancel(Status status) { - PerfMark.startTask("NettyClientStream$Sink.cancel"); - try { - writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.cancel"); - } + writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 64ab6a3bda..e9d221aa28 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -33,7 +33,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; -import io.perfmark.PerfMark; import java.util.logging.Level; import java.util.logging.Logger; @@ -92,8 +91,8 @@ class NettyServerStream extends AbstractServerStream { } private class Sink implements AbstractServerStream.Sink { - - private void requestInternal(final int numMessages) { + @Override + public void request(final int numMessages) { if (channel.eventLoop().inEventLoop()) { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); @@ -108,30 +107,16 @@ class NettyServerStream extends AbstractServerStream { } @Override - public void request(final int numMessages) { - PerfMark.startTask("NettyServerStream$Sink.request"); - try { - requestInternal(numMessages); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.request"); - } + public void writeHeaders(Metadata headers) { + writeQueue.enqueue( + SendResponseHeadersCommand.createHeaders( + transportState(), + Utils.convertServerHeaders(headers)), + true); } @Override - public void writeHeaders(Metadata headers) { - PerfMark.startTask("NettyServerStream$Sink.writeHeaders"); - try { - writeQueue.enqueue( - SendResponseHeadersCommand.createHeaders( - transportState(), - Utils.convertServerHeaders(headers)), - true); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeHeaders"); - } - } - - private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) { + public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { Preconditions.checkArgument(numMessages >= 0); if (frame == null) { writeQueue.scheduleFlush(); @@ -155,37 +140,17 @@ class NettyServerStream extends AbstractServerStream { }); } - @Override - public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { - PerfMark.startTask("NettyServerStream$Sink.writeFrame"); - try { - writeFrameInternal(frame, flush, numMessages); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeFrame"); - } - } - @Override public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { - PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); - try { - Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); - writeQueue.enqueue( - SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), - true); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); - } + Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); + writeQueue.enqueue( + SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), + true); } @Override public void cancel(Status status) { - PerfMark.startTask("NettyServerStream$Sink.cancel"); - try { - writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); - } finally { - PerfMark.startTask("NettyServerStream$Sink.cancel"); - } + writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); } } diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 4a343e8d6e..71e6bcfe29 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -21,8 +21,6 @@ import io.netty.buffer.ByteBufHolder; import io.netty.buffer.DefaultByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; -import io.perfmark.Link; -import io.perfmark.PerfMark; /** * Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint. @@ -30,7 +28,6 @@ import io.perfmark.PerfMark; final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand { private final StreamIdHolder stream; private final boolean endStream; - private final Link link; private ChannelPromise promise; @@ -38,12 +35,6 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu super(content); this.stream = stream; this.endStream = endStream; - this.link = PerfMark.link(); - } - - @Override - public Link getLink() { - return link; } int streamId() { diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index f3ef7ce0e1..24063ad181 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -22,8 +22,6 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; -import io.perfmark.Link; -import io.perfmark.PerfMark; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -106,44 +104,26 @@ class WriteQueue { * called in the event loop */ private void flush() { - PerfMark.startTask("WriteQueue.periodicFlush"); try { QueuedCommand cmd; int i = 0; boolean flushedOnce = false; while ((cmd = queue.poll()) != null) { - PerfMark.startTask("WriteQueue.run"); - try { - cmd.getLink().link(); - cmd.run(channel); - } finally { - PerfMark.stopTask("WriteQueue.run"); - } + cmd.run(channel); if (++i == DEQUE_CHUNK_SIZE) { i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop // might never end as new events are continuously added to the queue, if we never // flushed in that case we would be guaranteed to OOM. - PerfMark.startTask("WriteQueue.flush0"); - try { - channel.flush(); - } finally { - PerfMark.stopTask("WriteQueue.flush0"); - } + channel.flush(); flushedOnce = true; } } // Must flush at least once, even if there were no writes. if (i != 0 || !flushedOnce) { - PerfMark.startTask("WriteQueue.flush1"); - try { - channel.flush(); - } finally { - PerfMark.stopTask("WriteQueue.flush1"); - } + channel.flush(); } } finally { - PerfMark.stopTask("WriteQueue.periodicFlush"); // Mark the write as done, if the queue is non-empty after marking trigger a new write. scheduled.set(false); if (!queue.isEmpty()) { @@ -154,10 +134,8 @@ class WriteQueue { private static class RunnableCommand implements QueuedCommand { private final Runnable runnable; - private final Link link; public RunnableCommand(Runnable runnable) { - this.link = PerfMark.link(); this.runnable = runnable; } @@ -175,21 +153,11 @@ class WriteQueue { public final void run(Channel channel) { runnable.run(); } - - @Override - public Link getLink() { - return link; - } } abstract static class AbstractQueuedCommand implements QueuedCommand { private ChannelPromise promise; - private final Link link; - - AbstractQueuedCommand() { - this.link = PerfMark.link(); - } @Override public final void promise(ChannelPromise promise) { @@ -205,11 +173,6 @@ class WriteQueue { public final void run(Channel channel) { channel.write(this, promise); } - - @Override - public Link getLink() { - return link; - } } /** @@ -227,7 +190,5 @@ class WriteQueue { void promise(ChannelPromise promise); void run(Channel channel); - - Link getLink(); } } diff --git a/repositories.bzl b/repositories.bzl index c77df33038..4fd98cec56 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -35,7 +35,6 @@ def grpc_java_repositories( omit_io_netty_tcnative_boringssl_static = False, omit_io_opencensus_api = False, omit_io_opencensus_grpc_metrics = False, - omit_io_perfmark = False, omit_javax_annotation = False, omit_junit_junit = False, omit_net_zlib = False, @@ -104,8 +103,6 @@ def grpc_java_repositories( io_opencensus_api() if not omit_io_opencensus_grpc_metrics: io_opencensus_grpc_metrics() - if not omit_io_perfmark: - io_perfmark() if not omit_javax_annotation: javax_annotation() if not omit_junit_junit: @@ -405,15 +402,6 @@ def io_opencensus_grpc_metrics(): licenses = ["notice"], # Apache 2.0 ) -def io_perfmark(): - jvm_maven_import_external( - name = "io_perfmark_perfmark_api", - artifact = "io.perfmark:perfmark-api:0.16.0", - server_urls = ["http://central.maven.org/maven2"], - artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0", - licenses = ["notice"], # Apache 2.0 - ) - def javax_annotation(): # Use //stub:javax_annotation for neverlink=1 support. jvm_maven_import_external(