mirror of https://github.com/grpc/grpc-java.git
This reverts commit 2db3abc9ad.
This commit is contained in:
parent
b836b36777
commit
e5bd7f282c
|
|
@ -201,6 +201,7 @@ subprojects {
|
||||||
opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}",
|
opencensus_impl: "io.opencensus:opencensus-impl:${opencensusVersion}",
|
||||||
opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}",
|
opencensus_impl_lite: "io.opencensus:opencensus-impl-lite:${opencensusVersion}",
|
||||||
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3',
|
instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3',
|
||||||
|
perfmark: 'io.perfmark:perfmark-api:0.16.0',
|
||||||
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
|
protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}",
|
||||||
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",
|
protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1",
|
||||||
protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0",
|
protoc_lite: "com.google.protobuf:protoc-gen-javalite:3.0.0",
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,3 @@
|
||||||
PERFMARK_INTERNAL_ACCESSOR_SRCS = glob(
|
|
||||||
[
|
|
||||||
"src/main/java/io/grpc/perfmark/Internal*.java",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
PERFMARK_SRCS = glob(
|
|
||||||
[
|
|
||||||
"src/main/java/io/grpc/perfmark/*.java",
|
|
||||||
],
|
|
||||||
exclude = PERFMARK_INTERNAL_ACCESSOR_SRCS,
|
|
||||||
)
|
|
||||||
|
|
||||||
java_library(
|
java_library(
|
||||||
name = "core",
|
name = "core",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
|
|
@ -43,7 +30,6 @@ java_library(
|
||||||
]),
|
]),
|
||||||
visibility = ["//:__subpackages__"],
|
visibility = ["//:__subpackages__"],
|
||||||
deps = [
|
deps = [
|
||||||
":perfmark",
|
|
||||||
"//api",
|
"//api",
|
||||||
"//context",
|
"//context",
|
||||||
"@com_google_android_annotations//jar",
|
"@com_google_android_annotations//jar",
|
||||||
|
|
@ -54,6 +40,7 @@ java_library(
|
||||||
"@com_google_j2objc_j2objc_annotations//jar",
|
"@com_google_j2objc_j2objc_annotations//jar",
|
||||||
"@io_opencensus_opencensus_api//jar",
|
"@io_opencensus_opencensus_api//jar",
|
||||||
"@io_opencensus_opencensus_contrib_grpc_metrics//jar",
|
"@io_opencensus_opencensus_contrib_grpc_metrics//jar",
|
||||||
|
"@io_perfmark_perfmark_api//jar",
|
||||||
"@org_codehaus_mojo_animal_sniffer_annotations//jar",
|
"@org_codehaus_mojo_animal_sniffer_annotations//jar",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
@ -76,12 +63,3 @@ java_library(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
java_library(
|
|
||||||
name = "perfmark",
|
|
||||||
srcs = PERFMARK_SRCS,
|
|
||||||
visibility = ["//:__subpackages__"],
|
|
||||||
deps = [
|
|
||||||
"@com_google_code_findbugs_jsr305//jar",
|
|
||||||
"@com_google_errorprone_error_prone_annotations//jar",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ dependencies {
|
||||||
compile project(':grpc-api'),
|
compile project(':grpc-api'),
|
||||||
libraries.gson,
|
libraries.gson,
|
||||||
libraries.android_annotations,
|
libraries.android_annotations,
|
||||||
|
libraries.perfmark
|
||||||
compile (libraries.opencensus_api) {
|
compile (libraries.opencensus_api) {
|
||||||
// prefer 3.0.2 from libraries instead of 3.0.1
|
// prefer 3.0.2 from libraries instead of 3.0.1
|
||||||
exclude group: 'com.google.code.findbugs', module: 'jsr305'
|
exclude group: 'com.google.code.findbugs', module: 'jsr305'
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,9 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.MethodDescriptor.MethodType;
|
import io.grpc.MethodDescriptor.MethodType;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.perfmark.PerfMark;
|
import io.perfmark.Link;
|
||||||
import io.grpc.perfmark.PerfTag;
|
import io.perfmark.PerfMark;
|
||||||
|
import io.perfmark.Tag;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
|
|
@ -69,7 +70,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
= "gzip".getBytes(Charset.forName("US-ASCII"));
|
= "gzip".getBytes(Charset.forName("US-ASCII"));
|
||||||
|
|
||||||
private final MethodDescriptor<ReqT, RespT> method;
|
private final MethodDescriptor<ReqT, RespT> method;
|
||||||
private final PerfTag tag;
|
private final Tag tag;
|
||||||
private final Executor callExecutor;
|
private final Executor callExecutor;
|
||||||
private final CallTracer channelCallsTracer;
|
private final CallTracer channelCallsTracer;
|
||||||
private final Context context;
|
private final Context context;
|
||||||
|
|
@ -96,7 +97,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
boolean retryEnabled) {
|
boolean retryEnabled) {
|
||||||
this.method = method;
|
this.method = method;
|
||||||
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
|
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
|
||||||
this.tag = PerfMark.createTag(method.getFullMethodName());
|
this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
|
||||||
// If we know that the executor is a direct executor, we don't need to wrap it with a
|
// If we know that the executor is a direct executor, we don't need to wrap it with a
|
||||||
// SerializingExecutor. This is purely for performance reasons.
|
// SerializingExecutor. This is purely for performance reasons.
|
||||||
// See https://github.com/grpc/grpc-java/issues/368
|
// See https://github.com/grpc/grpc-java/issues/368
|
||||||
|
|
@ -112,6 +113,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
this.clientTransportProvider = clientTransportProvider;
|
this.clientTransportProvider = clientTransportProvider;
|
||||||
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
|
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
|
||||||
this.retryEnabled = retryEnabled;
|
this.retryEnabled = retryEnabled;
|
||||||
|
PerfMark.event("ClientCall.<init>", tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class ContextCancellationListener implements CancellationListener {
|
private final class ContextCancellationListener implements CancellationListener {
|
||||||
|
|
@ -183,11 +185,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(Listener<RespT> observer, Metadata headers) {
|
public void start(Listener<RespT> observer, Metadata headers) {
|
||||||
PerfMark.taskStart(tag, "ClientCall.start");
|
PerfMark.startTask("ClientCall.start", tag);
|
||||||
try {
|
try {
|
||||||
startInternal(observer, headers);
|
startInternal(observer, headers);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ClientCall.start");
|
PerfMark.stopTask("ClientCall.start", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -378,18 +380,23 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void request(int numMessages) {
|
public void request(int numMessages) {
|
||||||
|
PerfMark.startTask("ClientCall.request", tag);
|
||||||
|
try {
|
||||||
checkState(stream != null, "Not started");
|
checkState(stream != null, "Not started");
|
||||||
checkArgument(numMessages >= 0, "Number requested must be non-negative");
|
checkArgument(numMessages >= 0, "Number requested must be non-negative");
|
||||||
stream.request(numMessages);
|
stream.request(numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientCall.cancel", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(@Nullable String message, @Nullable Throwable cause) {
|
public void cancel(@Nullable String message, @Nullable Throwable cause) {
|
||||||
PerfMark.taskStart(tag, "ClientCall.cancel");
|
PerfMark.startTask("ClientCall.cancel", tag);
|
||||||
try {
|
try {
|
||||||
cancelInternal(message, cause);
|
cancelInternal(message, cause);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ClientCall.cancel");
|
PerfMark.stopTask("ClientCall.cancel", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -424,11 +431,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void halfClose() {
|
public void halfClose() {
|
||||||
PerfMark.taskStart(tag, "ClientCall.halfClose");
|
PerfMark.startTask("ClientCall.halfClose", tag);
|
||||||
try {
|
try {
|
||||||
halfCloseInternal();
|
halfCloseInternal();
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ClientCall.halfClose");
|
PerfMark.stopTask("ClientCall.halfClose", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -442,11 +449,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMessage(ReqT message) {
|
public void sendMessage(ReqT message) {
|
||||||
PerfMark.taskStart(tag, "ClientCall.sendMessage");
|
PerfMark.startTask("ClientCall.sendMessage", tag);
|
||||||
try {
|
try {
|
||||||
sendMessageInternal(message);
|
sendMessageInternal(message);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ClientCall.sendMessage");
|
PerfMark.stopTask("ClientCall.sendMessage", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -515,17 +522,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void headersRead(final Metadata headers) {
|
public void headersRead(final Metadata headers) {
|
||||||
|
PerfMark.startTask("ClientStreamListener.headersRead", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class HeadersRead extends ContextRunnable {
|
final class HeadersRead extends ContextRunnable {
|
||||||
HeadersRead() {
|
HeadersRead() {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ClientCall$Listener.headersRead", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
|
runInternal();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientCall$Listener.headersRead", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runInternal() {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
PerfMark.taskStart(tag, "ClientCall.headersRead");
|
|
||||||
try {
|
try {
|
||||||
observer.onHeaders(headers);
|
observer.onHeaders(headers);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
@ -533,29 +552,43 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
|
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
|
||||||
stream.cancel(status);
|
stream.cancel(status);
|
||||||
close(status, new Metadata());
|
close(status, new Metadata());
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(tag, "ClientCall.headersRead");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new HeadersRead());
|
callExecutor.execute(new HeadersRead());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientStreamListener.headersRead", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messagesAvailable(final MessageProducer producer) {
|
public void messagesAvailable(final MessageProducer producer) {
|
||||||
|
PerfMark.startTask("ClientStreamListener.messagesAvailable", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class MessagesAvailable extends ContextRunnable {
|
final class MessagesAvailable extends ContextRunnable {
|
||||||
MessagesAvailable() {
|
MessagesAvailable() {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
|
runInternal();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runInternal() {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
GrpcUtil.closeQuietly(producer);
|
GrpcUtil.closeQuietly(producer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
PerfMark.taskStart(tag, "ClientCall.messagesAvailable");
|
|
||||||
try {
|
try {
|
||||||
InputStream message;
|
InputStream message;
|
||||||
while ((message = producer.next()) != null) {
|
while ((message = producer.next()) != null) {
|
||||||
|
|
@ -573,13 +606,15 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
|
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
|
||||||
stream.cancel(status);
|
stream.cancel(status);
|
||||||
close(status, new Metadata());
|
close(status, new Metadata());
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(tag, "ClientCall.messagesAvailable");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new MessagesAvailable());
|
callExecutor.execute(new MessagesAvailable());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -603,6 +638,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
|
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
|
||||||
|
PerfMark.startTask("ClientStreamListener.closed", tag);
|
||||||
|
try {
|
||||||
|
closedInternal(status, rpcProgress, trailers);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientStreamListener.closed", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closedInternal(
|
||||||
|
Status status, @SuppressWarnings("unused") RpcProgress rpcProgress, Metadata trailers) {
|
||||||
Deadline deadline = effectiveDeadline();
|
Deadline deadline = effectiveDeadline();
|
||||||
if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
|
if (status.getCode() == Status.Code.CANCELLED && deadline != null) {
|
||||||
// When the server's deadline expires, it can only reset the stream with CANCEL and no
|
// When the server's deadline expires, it can only reset the stream with CANCEL and no
|
||||||
|
|
@ -616,23 +661,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
}
|
}
|
||||||
final Status savedStatus = status;
|
final Status savedStatus = status;
|
||||||
final Metadata savedTrailers = trailers;
|
final Metadata savedTrailers = trailers;
|
||||||
|
final Link link = PerfMark.link();
|
||||||
final class StreamClosed extends ContextRunnable {
|
final class StreamClosed extends ContextRunnable {
|
||||||
StreamClosed() {
|
StreamClosed() {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ClientCall$Listener.onClose", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
|
runInternal();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientCall$Listener.onClose", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runInternal() {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
// We intentionally don't keep the status or metadata from the server.
|
// We intentionally don't keep the status or metadata from the server.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
PerfMark.taskStart(tag, "ClientCall.closed");
|
|
||||||
try {
|
|
||||||
close(savedStatus, savedTrailers);
|
close(savedStatus, savedTrailers);
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(tag, "ClientCall.closed");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -641,14 +692,26 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReady() {
|
public void onReady() {
|
||||||
|
PerfMark.startTask("ClientStreamListener.onReady", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class StreamOnReady extends ContextRunnable {
|
final class StreamOnReady extends ContextRunnable {
|
||||||
StreamOnReady() {
|
StreamOnReady() {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void runInContext() {
|
public void runInContext() {
|
||||||
PerfMark.taskStart(tag, "ClientCall.onReady");
|
PerfMark.startTask("ClientCall$Listener.onReady", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
|
runInternal();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientCall$Listener.onReady", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runInternal() {
|
||||||
try {
|
try {
|
||||||
observer.onReady();
|
observer.onReady();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
|
@ -656,13 +719,15 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
||||||
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
|
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
|
||||||
stream.cancel(status);
|
stream.cancel(status);
|
||||||
close(status, new Metadata());
|
close(status, new Metadata());
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(tag, "ClientCall.onReady");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new StreamOnReady());
|
callExecutor.execute(new StreamOnReady());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ClientStreamListener.onReady", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.ServerCall;
|
import io.grpc.ServerCall;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.perfmark.PerfMark;
|
import io.perfmark.PerfMark;
|
||||||
import io.grpc.perfmark.PerfTag;
|
import io.perfmark.Tag;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
@ -54,7 +54,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
|
|
||||||
private final ServerStream stream;
|
private final ServerStream stream;
|
||||||
private final MethodDescriptor<ReqT, RespT> method;
|
private final MethodDescriptor<ReqT, RespT> method;
|
||||||
private final PerfTag tag;
|
private final Tag tag;
|
||||||
private final Context.CancellableContext context;
|
private final Context.CancellableContext context;
|
||||||
private final byte[] messageAcceptEncoding;
|
private final byte[] messageAcceptEncoding;
|
||||||
private final DecompressorRegistry decompressorRegistry;
|
private final DecompressorRegistry decompressorRegistry;
|
||||||
|
|
@ -71,31 +71,35 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
|
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
|
||||||
Metadata inboundHeaders, Context.CancellableContext context,
|
Metadata inboundHeaders, Context.CancellableContext context,
|
||||||
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
|
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
|
||||||
CallTracer serverCallTracer) {
|
CallTracer serverCallTracer, Tag tag) {
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
this.method = method;
|
this.method = method;
|
||||||
// TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall.
|
|
||||||
this.tag = PerfMark.createTag(method.getFullMethodName());
|
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
|
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
|
||||||
this.decompressorRegistry = decompressorRegistry;
|
this.decompressorRegistry = decompressorRegistry;
|
||||||
this.compressorRegistry = compressorRegistry;
|
this.compressorRegistry = compressorRegistry;
|
||||||
this.serverCallTracer = serverCallTracer;
|
this.serverCallTracer = serverCallTracer;
|
||||||
this.serverCallTracer.reportCallStarted();
|
this.serverCallTracer.reportCallStarted();
|
||||||
|
this.tag = tag;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void request(int numMessages) {
|
public void request(int numMessages) {
|
||||||
|
PerfMark.startTask("ServerCall.request", tag);
|
||||||
|
try {
|
||||||
stream.request(numMessages);
|
stream.request(numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerCall.request", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendHeaders(Metadata headers) {
|
public void sendHeaders(Metadata headers) {
|
||||||
PerfMark.taskStart(tag, "ServerCall.sendHeaders");
|
PerfMark.startTask("ServerCall.sendHeaders", tag);
|
||||||
try {
|
try {
|
||||||
sendHeadersInternal(headers);
|
sendHeadersInternal(headers);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ServerCall.sendHeaders");
|
PerfMark.stopTask("ServerCall.sendHeaders", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -140,11 +144,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMessage(RespT message) {
|
public void sendMessage(RespT message) {
|
||||||
PerfMark.taskStart(tag, "ServerCall.sendMessage");
|
PerfMark.startTask("ServerCall.sendMessage", tag);
|
||||||
try {
|
try {
|
||||||
sendMessageInternal(message);
|
sendMessageInternal(message);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ServerCall.sendMessage");
|
PerfMark.stopTask("ServerCall.sendMessage", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -193,11 +197,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close(Status status, Metadata trailers) {
|
public void close(Status status, Metadata trailers) {
|
||||||
PerfMark.taskStart(tag, "ServerCall.close");
|
PerfMark.startTask("ServerCall.close", tag);
|
||||||
try {
|
try {
|
||||||
closeInternal(status, trailers);
|
closeInternal(status, trailers);
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(tag, "ServerCall.close");
|
PerfMark.stopTask("ServerCall.close", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -281,15 +285,23 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
MoreExecutors.directExecutor());
|
MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
|
|
||||||
@Override
|
@Override
|
||||||
public void messagesAvailable(final MessageProducer producer) {
|
public void messagesAvailable(MessageProducer producer) {
|
||||||
|
PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag);
|
||||||
|
try {
|
||||||
|
messagesAvailableInternal(producer);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
|
||||||
|
private void messagesAvailableInternal(final MessageProducer producer) {
|
||||||
if (call.cancelled) {
|
if (call.cancelled) {
|
||||||
GrpcUtil.closeQuietly(producer);
|
GrpcUtil.closeQuietly(producer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
PerfMark.taskStart(call.tag, "ServerCall.messagesAvailable");
|
|
||||||
InputStream message;
|
InputStream message;
|
||||||
try {
|
try {
|
||||||
while ((message = producer.next()) != null) {
|
while ((message = producer.next()) != null) {
|
||||||
|
|
@ -305,30 +317,34 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
GrpcUtil.closeQuietly(producer);
|
GrpcUtil.closeQuietly(producer);
|
||||||
Throwables.throwIfUnchecked(t);
|
Throwables.throwIfUnchecked(t);
|
||||||
throw new RuntimeException(t);
|
throw new RuntimeException(t);
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(call.tag, "ServerCall.messagesAvailable");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void halfClosed() {
|
public void halfClosed() {
|
||||||
|
PerfMark.startTask("ServerStreamListener.halfClosed", call.tag);
|
||||||
|
try {
|
||||||
if (call.cancelled) {
|
if (call.cancelled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
PerfMark.taskStart(call.tag, "ServerCall.halfClosed");
|
|
||||||
|
|
||||||
try {
|
|
||||||
listener.onHalfClose();
|
listener.onHalfClose();
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(call.tag, "ServerCall.halfClosed");
|
PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closed(Status status) {
|
public void closed(Status status) {
|
||||||
PerfMark.taskStart(call.tag, "ServerCall.closed");
|
PerfMark.startTask("ServerStreamListener.closed", call.tag);
|
||||||
try {
|
try {
|
||||||
|
closedInternal(status);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.closed", call.tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closedInternal(Status status) {
|
||||||
try {
|
try {
|
||||||
if (status.isOk()) {
|
if (status.isOk()) {
|
||||||
listener.onComplete();
|
listener.onComplete();
|
||||||
|
|
@ -340,23 +356,19 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
|
||||||
// Cancel context after delivering RPC closure notification to allow the application to
|
// Cancel context after delivering RPC closure notification to allow the application to
|
||||||
// clean up and update any state based on whether onComplete or onCancel was called.
|
// clean up and update any state based on whether onComplete or onCancel was called.
|
||||||
context.cancel(null);
|
context.cancel(null);
|
||||||
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
PerfMark.taskEnd(call.tag, "ServerCall.closed");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReady() {
|
public void onReady() {
|
||||||
|
PerfMark.startTask("ServerStreamListener.onReady", call.tag);
|
||||||
|
try {
|
||||||
if (call.cancelled) {
|
if (call.cancelled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
PerfMark.taskStart(call.tag, "ServerCall.closed");
|
|
||||||
try {
|
|
||||||
listener.onReady();
|
listener.onReady();
|
||||||
} finally {
|
} finally {
|
||||||
PerfMark.taskEnd(call.tag, "ServerCall.closed");
|
PerfMark.stopTask("ServerCall.closed", call.tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,9 @@ import io.grpc.ServerMethodDefinition;
|
||||||
import io.grpc.ServerServiceDefinition;
|
import io.grpc.ServerServiceDefinition;
|
||||||
import io.grpc.ServerTransportFilter;
|
import io.grpc.ServerTransportFilter;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.perfmark.Link;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
|
import io.perfmark.Tag;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
|
@ -460,9 +463,21 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
transportClosed(transport);
|
transportClosed(transport);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void streamCreated(
|
public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
|
||||||
final ServerStream stream, final String methodName, final Metadata headers) {
|
Tag tag = PerfMark.createTag(methodName, stream.hashCode());
|
||||||
|
PerfMark.startTask("ServerTransportListener.streamCreated", tag);
|
||||||
|
try {
|
||||||
|
streamCreatedInternal(stream, methodName, headers, tag);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerTransportListener.streamCreated", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void streamCreatedInternal(
|
||||||
|
final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) {
|
||||||
|
|
||||||
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
|
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
|
||||||
String encoding = headers.get(MESSAGE_ENCODING_KEY);
|
String encoding = headers.get(MESSAGE_ENCODING_KEY);
|
||||||
Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
|
Decompressor decompressor = decompressorRegistry.lookupDecompressor(encoding);
|
||||||
|
|
@ -489,22 +504,33 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
wrappedExecutor = new SerializingExecutor(executor);
|
wrappedExecutor = new SerializingExecutor(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final JumpToApplicationThreadServerStreamListener jumpListener
|
final JumpToApplicationThreadServerStreamListener jumpListener
|
||||||
= new JumpToApplicationThreadServerStreamListener(
|
= new JumpToApplicationThreadServerStreamListener(
|
||||||
wrappedExecutor, executor, stream, context);
|
wrappedExecutor, executor, stream, context, tag);
|
||||||
stream.setListener(jumpListener);
|
stream.setListener(jumpListener);
|
||||||
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
|
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
|
||||||
// are delivered, including any errors. Callbacks can still be triggered, but they will be
|
// are delivered, including any errors. Callbacks can still be triggered, but they will be
|
||||||
// queued.
|
// queued.
|
||||||
|
|
||||||
final class StreamCreated extends ContextRunnable {
|
final class StreamCreated extends ContextRunnable {
|
||||||
|
|
||||||
StreamCreated() {
|
StreamCreated() {
|
||||||
super(context);
|
super(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
|
runInternal();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runInternal() {
|
||||||
ServerStreamListener listener = NOOP_LISTENER;
|
ServerStreamListener listener = NOOP_LISTENER;
|
||||||
try {
|
try {
|
||||||
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
|
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
|
||||||
|
|
@ -523,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
context.cancel(null);
|
context.cancel(null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
|
listener =
|
||||||
|
startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
stream.close(Status.fromThrowable(e), new Metadata());
|
stream.close(Status.fromThrowable(e), new Metadata());
|
||||||
context.cancel(null);
|
context.cancel(null);
|
||||||
|
|
@ -573,7 +600,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
/** Never returns {@code null}. */
|
/** Never returns {@code null}. */
|
||||||
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
|
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
|
||||||
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
|
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
|
||||||
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
|
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
|
||||||
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
|
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
|
||||||
statsTraceCtx.serverCallStarted(
|
statsTraceCtx.serverCallStarted(
|
||||||
new ServerCallInfoImpl<>(
|
new ServerCallInfoImpl<>(
|
||||||
|
|
@ -587,7 +614,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
|
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
|
||||||
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
|
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
|
||||||
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
|
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
|
||||||
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
|
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
|
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
|
||||||
|
|
@ -595,7 +622,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
ServerMethodDefinition<WReqT, WRespT> methodDef,
|
ServerMethodDefinition<WReqT, WRespT> methodDef,
|
||||||
ServerStream stream,
|
ServerStream stream,
|
||||||
Metadata headers,
|
Metadata headers,
|
||||||
Context.CancellableContext context) {
|
Context.CancellableContext context,
|
||||||
|
Tag tag) {
|
||||||
|
|
||||||
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
|
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
|
||||||
stream,
|
stream,
|
||||||
|
|
@ -604,7 +632,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
context,
|
context,
|
||||||
decompressorRegistry,
|
decompressorRegistry,
|
||||||
compressorRegistry,
|
compressorRegistry,
|
||||||
serverCallTracer);
|
serverCallTracer,
|
||||||
|
tag);
|
||||||
|
|
||||||
ServerCall.Listener<WReqT> listener =
|
ServerCall.Listener<WReqT> listener =
|
||||||
methodDef.getServerCallHandler().startCall(call, headers);
|
methodDef.getServerCallHandler().startCall(call, headers);
|
||||||
|
|
@ -687,15 +716,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
private final Executor cancelExecutor;
|
private final Executor cancelExecutor;
|
||||||
private final Context.CancellableContext context;
|
private final Context.CancellableContext context;
|
||||||
private final ServerStream stream;
|
private final ServerStream stream;
|
||||||
|
private final Tag tag;
|
||||||
// Only accessed from callExecutor.
|
// Only accessed from callExecutor.
|
||||||
private ServerStreamListener listener;
|
private ServerStreamListener listener;
|
||||||
|
|
||||||
public JumpToApplicationThreadServerStreamListener(Executor executor,
|
public JumpToApplicationThreadServerStreamListener(Executor executor,
|
||||||
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context) {
|
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
|
||||||
this.callExecutor = executor;
|
this.callExecutor = executor;
|
||||||
this.cancelExecutor = cancelExecutor;
|
this.cancelExecutor = cancelExecutor;
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
this.tag = tag;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -725,6 +756,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messagesAvailable(final MessageProducer producer) {
|
public void messagesAvailable(final MessageProducer producer) {
|
||||||
|
PerfMark.startTask("ServerStreamListener.messagesAvailable", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class MessagesAvailable extends ContextRunnable {
|
final class MessagesAvailable extends ContextRunnable {
|
||||||
|
|
||||||
|
|
@ -734,6 +767,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag);
|
||||||
|
link.link();
|
||||||
try {
|
try {
|
||||||
getListener().messagesAvailable(producer);
|
getListener().messagesAvailable(producer);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
|
|
@ -742,15 +777,24 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
internalClose();
|
internalClose();
|
||||||
throw e;
|
throw e;
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new MessagesAvailable());
|
callExecutor.execute(new MessagesAvailable());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void halfClosed() {
|
public void halfClosed() {
|
||||||
|
PerfMark.startTask("ServerStreamListener.halfClosed", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class HalfClosed extends ContextRunnable {
|
final class HalfClosed extends ContextRunnable {
|
||||||
HalfClosed() {
|
HalfClosed() {
|
||||||
super(context);
|
super(context);
|
||||||
|
|
@ -758,6 +802,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ServerCallListener(app).halfClosed", tag);
|
||||||
|
link.link();
|
||||||
try {
|
try {
|
||||||
getListener().halfClosed();
|
getListener().halfClosed();
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
|
|
@ -766,15 +812,30 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
internalClose();
|
internalClose();
|
||||||
throw e;
|
throw e;
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new HalfClosed());
|
callExecutor.execute(new HalfClosed());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.halfClosed", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void closed(final Status status) {
|
public void closed(final Status status) {
|
||||||
|
PerfMark.startTask("ServerStreamListener.closed", tag);
|
||||||
|
try {
|
||||||
|
closedInternal(status);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.closed", tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closedInternal(final Status status) {
|
||||||
// For cancellations, promptly inform any users of the context that their work should be
|
// For cancellations, promptly inform any users of the context that their work should be
|
||||||
// aborted. Otherwise, we can wait until pending work is done.
|
// aborted. Otherwise, we can wait until pending work is done.
|
||||||
if (!status.isOk()) {
|
if (!status.isOk()) {
|
||||||
|
|
@ -782,6 +843,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
// is not serializing.
|
// is not serializing.
|
||||||
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
|
cancelExecutor.execute(new ContextCloser(context, status.getCause()));
|
||||||
}
|
}
|
||||||
|
final Link link = PerfMark.link();
|
||||||
|
|
||||||
final class Closed extends ContextRunnable {
|
final class Closed extends ContextRunnable {
|
||||||
Closed() {
|
Closed() {
|
||||||
|
|
@ -790,7 +852,13 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ServerCallListener(app).closed", tag);
|
||||||
|
link.link();
|
||||||
|
try {
|
||||||
getListener().closed(status);
|
getListener().closed(status);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerCallListener(app).closed", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -799,6 +867,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReady() {
|
public void onReady() {
|
||||||
|
PerfMark.startTask("ServerStreamListener.onReady", tag);
|
||||||
|
final Link link = PerfMark.link();
|
||||||
final class OnReady extends ContextRunnable {
|
final class OnReady extends ContextRunnable {
|
||||||
OnReady() {
|
OnReady() {
|
||||||
super(context);
|
super(context);
|
||||||
|
|
@ -806,6 +876,8 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void runInContext() {
|
public void runInContext() {
|
||||||
|
PerfMark.startTask("ServerCallListener(app).onReady", tag);
|
||||||
|
link.link();
|
||||||
try {
|
try {
|
||||||
getListener().onReady();
|
getListener().onReady();
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
|
|
@ -814,11 +886,17 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
|
||||||
} catch (Error e) {
|
} catch (Error e) {
|
||||||
internalClose();
|
internalClose();
|
||||||
throw e;
|
throw e;
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
callExecutor.execute(new OnReady());
|
callExecutor.execute(new OnReady());
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("ServerStreamListener.onReady", tag);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,49 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal {@link PerfTag.TagFactory} and {@link PerfMarkTask} accessor. This is intended for use
|
|
||||||
* by io.grpc.perfmark, and the specifically supported packages that utilize PerfMark. If you
|
|
||||||
* *really* think you need to use this, contact the gRPC team first.
|
|
||||||
*/
|
|
||||||
public final class InternalPerfMark {
|
|
||||||
|
|
||||||
private InternalPerfMark() {}
|
|
||||||
|
|
||||||
/** Expose class to allow packages that utilize PerfMark to get PerfMarkTask instances. */
|
|
||||||
public abstract static class InternalPerfMarkTask extends PerfMarkTask {
|
|
||||||
public InternalPerfMarkTask() {}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Expose methods that create PerfTag to packages that utilize PerfMark. */
|
|
||||||
private static final long NULL_NUMERIC_TAG = 0;
|
|
||||||
private static final String NULL_STRING_TAG = null;
|
|
||||||
|
|
||||||
public static PerfTag createPerfTag(long numericTag, String stringTag) {
|
|
||||||
return PerfTag.TagFactory.create(numericTag, stringTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static PerfTag createPerfTag(String stringTag) {
|
|
||||||
return PerfTag.TagFactory.create(NULL_NUMERIC_TAG, stringTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static PerfTag createPerfTag(long numericTag) {
|
|
||||||
return PerfTag.TagFactory.create(numericTag, NULL_STRING_TAG);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,171 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
||||||
import com.google.errorprone.annotations.CompileTimeConstant;
|
|
||||||
import io.grpc.perfmark.PerfTag.TagFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* PerfMark is a collection of stub methods for marking key points in the RPC lifecycle. This
|
|
||||||
* class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this yet.
|
|
||||||
*/
|
|
||||||
public final class PerfMark {
|
|
||||||
private PerfMark() {
|
|
||||||
throw new AssertionError("nope");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a Task with a Tag to identify it; a task represents some work that spans some time, and
|
|
||||||
* you are interested in both its start time and end time.
|
|
||||||
*
|
|
||||||
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
|
|
||||||
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
|
|
||||||
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
|
|
||||||
* #taskStart(String)} or {@link PerfTag#create(String)}.
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task.
|
|
||||||
*/
|
|
||||||
public static void taskStart(PerfTag tag, @CompileTimeConstant String taskName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a Task; a task represents some work that spans some time, and you are interested in both
|
|
||||||
* its start time and end time.
|
|
||||||
*
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task.
|
|
||||||
*/
|
|
||||||
public static void taskStart(@CompileTimeConstant String taskName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
|
|
||||||
* you are interested in both its start time and end time.
|
|
||||||
*
|
|
||||||
* @param tag a Tag object associated with the task start. This should be the tag used for the
|
|
||||||
* corresponding {@link #taskStart(PerfTag, String)} call.
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
|
|
||||||
* be the name used by the corresponding {@link #taskStart(PerfTag, String)} call.
|
|
||||||
*/
|
|
||||||
public static void taskEnd(PerfTag tag, @CompileTimeConstant String taskName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* End a Task with a Tag to identify it; a task represents some work that spans some time, and
|
|
||||||
* you are interested in both its start time and end time.
|
|
||||||
*
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task. This should
|
|
||||||
* be the name used by the corresponding {@link #taskStart(String)} call.
|
|
||||||
*/
|
|
||||||
public static void taskEnd(@CompileTimeConstant String taskName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a Task with a Tag to identify it in a try-with-resource statement; a task represents some
|
|
||||||
* work that spans some time, and you are interested in both its start time and end time.
|
|
||||||
*
|
|
||||||
* <p>Use this in a try-with-resource statement so that task will end automatically.
|
|
||||||
*
|
|
||||||
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
|
|
||||||
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
|
|
||||||
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
|
|
||||||
* #task(String)} or {@link PerfTag#create(String)}.
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task.
|
|
||||||
*/
|
|
||||||
public static PerfMarkTask task(PerfTag tag, @CompileTimeConstant String taskName) {
|
|
||||||
return NoopTask.INSTANCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a Task it in a try-with-resource statement; a task represents some work that spans some
|
|
||||||
* time, and you are interested in both its start time and end time.
|
|
||||||
*
|
|
||||||
* <p>Use this in a try-with-resource statement so that task will end automatically.
|
|
||||||
*
|
|
||||||
* @param taskName The name of the task. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this task.
|
|
||||||
*/
|
|
||||||
public static PerfMarkTask task(@CompileTimeConstant String taskName) {
|
|
||||||
return NoopTask.INSTANCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Records an Event with a Tag to identify it.
|
|
||||||
*
|
|
||||||
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
|
|
||||||
* interested in only the time it happened.
|
|
||||||
*
|
|
||||||
* @param tag a Tag object associated with the task. See {@link PerfTag} for description. Don't
|
|
||||||
* use 0 for the {@code numericTag} of the Tag object. 0 is reserved to represent that a task
|
|
||||||
* does not have a numeric tag associated. In this case, you are encouraged to use {@link
|
|
||||||
* #event(String)} or {@link PerfTag#create(String)}.
|
|
||||||
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this event.
|
|
||||||
*/
|
|
||||||
public static void event(PerfTag tag, @CompileTimeConstant String eventName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Records an Event.
|
|
||||||
*
|
|
||||||
* <p>An Event is different from a Task in that you don't care how much time it spanned. You are
|
|
||||||
* interested in only the time it happened.
|
|
||||||
*
|
|
||||||
* @param eventName The name of the event. <b>This parameter must be a compile-time constant!</b>
|
|
||||||
* Otherwise, instrumentation result will show "(invalid name)" for this event.
|
|
||||||
*/
|
|
||||||
public static void event(@CompileTimeConstant String eventName) {}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If PerfMark instrumentation is not enabled, returns a Tag with numericTag = 0L. Replacement
|
|
||||||
* for {@link TagFactory#create(long, String)} if PerfMark agent is enabled.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
public static PerfTag createTag(
|
|
||||||
@SuppressWarnings("unused") long numericTag, @SuppressWarnings("unused") String stringTag) {
|
|
||||||
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
|
|
||||||
return NULL_PERF_TAG;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
|
|
||||||
* for {@link TagFactory#create(String)} if PerfMark agent is enabled.
|
|
||||||
*/
|
|
||||||
public static PerfTag createTag(@SuppressWarnings("unused") String stringTag) {
|
|
||||||
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
|
|
||||||
return NULL_PERF_TAG;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If PerfMark instrumentation is not enabled returns a Tag with numericTag = 0L. Replacement
|
|
||||||
* for {@link TagFactory#create(long)} if PerfMark agent is enabled.
|
|
||||||
*/
|
|
||||||
public static PerfTag createTag(@SuppressWarnings("unused") long numericTag) {
|
|
||||||
// Warning suppression is safe as this method returns by default the NULL_PERF_TAG
|
|
||||||
return NULL_PERF_TAG;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final PerfTag NULL_PERF_TAG = TagFactory.create();
|
|
||||||
|
|
||||||
private static final class NoopTask extends PerfMarkTask {
|
|
||||||
|
|
||||||
private static final PerfMarkTask INSTANCE = new NoopTask();
|
|
||||||
|
|
||||||
NoopTask() {}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class exists to make it easier for users to use the try-with-resource shorthand for
|
|
||||||
* starting and ending a PerfMark Task. This class is {@link io.grpc.Internal} and
|
|
||||||
* {@link io.grpc.ExperimentalApi}. Do not use this yet.
|
|
||||||
*/
|
|
||||||
public abstract class PerfMarkTask implements Closeable {
|
|
||||||
@Override
|
|
||||||
public abstract void close();
|
|
||||||
|
|
||||||
PerfMarkTask() {}
|
|
||||||
}
|
|
||||||
|
|
@ -1,71 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
||||||
import java.lang.annotation.ElementType;
|
|
||||||
import java.lang.annotation.Retention;
|
|
||||||
import java.lang.annotation.RetentionPolicy;
|
|
||||||
import java.lang.annotation.Target;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Annotation to add PerfMark instrumentation points surrounding method invocation.
|
|
||||||
*
|
|
||||||
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
|
|
||||||
* yet.
|
|
||||||
*/
|
|
||||||
@Retention(RetentionPolicy.RUNTIME)
|
|
||||||
@Target(ElementType.METHOD)
|
|
||||||
// TODO(carl-mastrangelo): Add this line back in and make it okay on Android
|
|
||||||
//@IncompatibleModifiers(value = {Modifier.ABSTRACT, Modifier.NATIVE})
|
|
||||||
public @interface PerfMarker {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The name of the task; e.g. `parseHeaders`.
|
|
||||||
*/
|
|
||||||
String taskName();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An optional computed tag.
|
|
||||||
*
|
|
||||||
* <p>There are 3 supported references that can be used
|
|
||||||
* <ul>
|
|
||||||
* <li>{@code "this"}: Then the tag will be the {@link Object#toString} of the current class.
|
|
||||||
* Only valid for instance methods.
|
|
||||||
* <li>{@code "someFieldName"}: Then the tag will be the result of
|
|
||||||
* calling {@link String#valueOf(Object)} on the field. The field cannot be a primitive or
|
|
||||||
* and array type. (Though we may revisit this in the future).
|
|
||||||
* <li>{@code "$N"}: Then the tag will be the result of calling {@link String#valueOf(Object)}
|
|
||||||
* on the Nth method parameter. Parameters are {@code 0} indexed so {@code "$1"} is the
|
|
||||||
* second parameter. The referenced parameter cannot be a primitive or an array type.
|
|
||||||
* (Though we may revisit this in the future).
|
|
||||||
* </ul>
|
|
||||||
*
|
|
||||||
* <p>In general you should reference either {@code "this"} or {@code final} fields since
|
|
||||||
* in these cases we can cache the operations to decrease the cost of computing the tags. A side
|
|
||||||
* effect of this is that for such references we will not have their tags recalculated after the
|
|
||||||
* first time. Thus it is best to use immutable objects for tags.
|
|
||||||
*/
|
|
||||||
String computedTag() default "";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* True if class with annotation is immutable and instrumentation must adhere to this restriction.
|
|
||||||
* If enableSampling is passed as argument to the agent, instrumentation points with <code>
|
|
||||||
* immutable = true </code> are ignored.
|
|
||||||
*/
|
|
||||||
boolean immutable() default false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -1,118 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import javax.annotation.concurrent.Immutable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A Tag is used to provide additional information to identify a task and consists of a 64-bit
|
|
||||||
* integer value and a string.
|
|
||||||
*
|
|
||||||
* <p>Both the {@code numericTag} and the {@code stringTag} are optional. The {@code numericTag}
|
|
||||||
* value can be used to identify the specific task being worked on (e.g. the id of the rpc call).
|
|
||||||
* The {@code stringTag} can be used to store any value that is not a compile-time constant (a
|
|
||||||
* restriction imposed for the name passed to PerfMark tasks and events). A value of 0 for the
|
|
||||||
* {@code numericTag} is considered null. Don't use 0 for the {@code numericTag} unless you intend
|
|
||||||
* to specify null. In that case you are encouraged to use {@link #create(String)}.
|
|
||||||
*
|
|
||||||
* <p>Invocations to {@code create} methods in this class are a no-op unless PerfMark
|
|
||||||
* instrumentation is enabled. If so, calls to {@code create} methods to this class are replaced for
|
|
||||||
* calls to {@link TagFactory} create methods.
|
|
||||||
*
|
|
||||||
* <p>This class is {@link io.grpc.Internal} and {@link io.grpc.ExperimentalApi}. Do not use this
|
|
||||||
* yet.
|
|
||||||
*/
|
|
||||||
@Immutable
|
|
||||||
public final class PerfTag {
|
|
||||||
|
|
||||||
private static final long NULL_NUMERIC_TAG = 0;
|
|
||||||
private static final String NULL_STRING_TAG = null;
|
|
||||||
|
|
||||||
private final long numericTag;
|
|
||||||
private final String stringTag;
|
|
||||||
|
|
||||||
private PerfTag(long numericTag, @Nullable String stringTag) {
|
|
||||||
this.numericTag = numericTag;
|
|
||||||
this.stringTag = stringTag;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the numeric tag if set, or {@link #NULL_NUMERIC_TAG} instead. */
|
|
||||||
public long getNumericTag() {
|
|
||||||
return numericTag;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Returns the string tag if set, or {@link #NULL_STRING_TAG} instead. */
|
|
||||||
@Nullable public String getStringTag() {
|
|
||||||
return stringTag;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Tag(numericTag=" + numericTag + ",stringTag='" + stringTag + "')";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
int longHashCode = (int)(numericTag ^ (numericTag >>> 32));
|
|
||||||
return longHashCode + (stringTag != null ? stringTag.hashCode() : 31);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
@SuppressWarnings("ReferenceEquality") // No Java 8 yet.
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (!(obj instanceof PerfTag)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
PerfTag that = (PerfTag) obj;
|
|
||||||
return numericTag == that.numericTag
|
|
||||||
&& (stringTag == that.stringTag || (stringTag != null && stringTag.equals(that.stringTag)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Provides methods that create Tag instances which should not be directly invoked by clients.
|
|
||||||
*
|
|
||||||
* <p>Calls to {@link PerfMark#create(long)}, {@link PerfMark#create(long, String)} and {@link
|
|
||||||
* PerfMark#create(String)} are replaced with calls to the methods in this class using bytecode
|
|
||||||
* rewriting, if enabled.
|
|
||||||
*/
|
|
||||||
static final class TagFactory {
|
|
||||||
/**
|
|
||||||
* This class should not be instantiated.
|
|
||||||
*/
|
|
||||||
private TagFactory() {
|
|
||||||
throw new AssertionError("nope");
|
|
||||||
}
|
|
||||||
|
|
||||||
public static PerfTag create(long numericTag, String stringTag) {
|
|
||||||
return new PerfTag(numericTag, stringTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static PerfTag create(String stringTag) {
|
|
||||||
return new PerfTag(NULL_NUMERIC_TAG, stringTag);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static PerfTag create(long numericTag) {
|
|
||||||
return new PerfTag(numericTag, NULL_STRING_TAG);
|
|
||||||
}
|
|
||||||
|
|
||||||
static PerfTag create() {
|
|
||||||
return new PerfTag(NULL_NUMERIC_TAG, NULL_STRING_TAG);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2019 The gRPC Authors
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is an internal, experimental API and not subject to the normal compatibility guarantees.
|
|
||||||
*
|
|
||||||
* @see io.grpc.Internal
|
|
||||||
*/
|
|
||||||
@javax.annotation.CheckReturnValue
|
|
||||||
@javax.annotation.ParametersAreNonnullByDefault
|
|
||||||
package io.grpc.perfmark;
|
|
||||||
|
|
@ -43,6 +43,7 @@ import io.grpc.ServerCall;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
|
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
|
||||||
import io.grpc.internal.testing.SingleMessageProducer;
|
import io.grpc.internal.testing.SingleMessageProducer;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
|
|
@ -90,7 +91,7 @@ public class ServerCallImplTest {
|
||||||
context = Context.ROOT.withCancellation();
|
context = Context.ROOT.withCancellation();
|
||||||
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
|
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
|
||||||
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
||||||
serverCallTracer);
|
serverCallTracer, PerfMark.createTag());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -113,7 +114,7 @@ public class ServerCallImplTest {
|
||||||
|
|
||||||
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
|
call = new ServerCallImpl<>(stream, UNARY_METHOD, requestHeaders, context,
|
||||||
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
|
||||||
tracer);
|
tracer, PerfMark.createTag());
|
||||||
|
|
||||||
// required boilerplate
|
// required boilerplate
|
||||||
call.sendHeaders(new Metadata());
|
call.sendHeaders(new Metadata());
|
||||||
|
|
@ -224,7 +225,8 @@ public class ServerCallImplTest {
|
||||||
context,
|
context,
|
||||||
DecompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(),
|
||||||
CompressorRegistry.getDefaultInstance(),
|
CompressorRegistry.getDefaultInstance(),
|
||||||
serverCallTracer);
|
serverCallTracer,
|
||||||
|
PerfMark.createTag());
|
||||||
serverCall.sendHeaders(new Metadata());
|
serverCall.sendHeaders(new Metadata());
|
||||||
serverCall.sendMessage(1L);
|
serverCall.sendMessage(1L);
|
||||||
verify(stream, times(1)).writeMessage(any(InputStream.class));
|
verify(stream, times(1)).writeMessage(any(InputStream.class));
|
||||||
|
|
@ -258,7 +260,8 @@ public class ServerCallImplTest {
|
||||||
context,
|
context,
|
||||||
DecompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(),
|
||||||
CompressorRegistry.getDefaultInstance(),
|
CompressorRegistry.getDefaultInstance(),
|
||||||
serverCallTracer);
|
serverCallTracer,
|
||||||
|
PerfMark.createTag());
|
||||||
serverCall.sendHeaders(new Metadata());
|
serverCall.sendHeaders(new Metadata());
|
||||||
serverCall.sendMessage(1L);
|
serverCall.sendMessage(1L);
|
||||||
serverCall.sendMessage(1L);
|
serverCall.sendMessage(1L);
|
||||||
|
|
@ -295,7 +298,8 @@ public class ServerCallImplTest {
|
||||||
context,
|
context,
|
||||||
DecompressorRegistry.getDefaultInstance(),
|
DecompressorRegistry.getDefaultInstance(),
|
||||||
CompressorRegistry.getDefaultInstance(),
|
CompressorRegistry.getDefaultInstance(),
|
||||||
serverCallTracer);
|
serverCallTracer,
|
||||||
|
PerfMark.createTag());
|
||||||
serverCall.close(Status.OK, new Metadata());
|
serverCall.close(Status.OK, new Metadata());
|
||||||
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
|
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
|
||||||
verify(stream, times(1)).cancel(statusCaptor.capture());
|
verify(stream, times(1)).cancel(statusCaptor.capture());
|
||||||
|
|
|
||||||
|
|
@ -77,6 +77,7 @@ import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
|
||||||
import io.grpc.internal.testing.SingleMessageProducer;
|
import io.grpc.internal.testing.SingleMessageProducer;
|
||||||
import io.grpc.internal.testing.TestServerStreamTracer;
|
import io.grpc.internal.testing.TestServerStreamTracer;
|
||||||
import io.grpc.util.MutableHandlerRegistry;
|
import io.grpc.util.MutableHandlerRegistry;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
@ -1140,7 +1141,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
@ -1165,7 +1167,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
@ -1190,7 +1193,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
@ -1213,7 +1217,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
@ -1236,7 +1241,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
@ -1259,7 +1265,8 @@ public class ServerImplTest {
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
executor.getScheduledExecutorService(),
|
executor.getScheduledExecutorService(),
|
||||||
stream,
|
stream,
|
||||||
Context.ROOT.withCancellation());
|
Context.ROOT.withCancellation(),
|
||||||
|
PerfMark.createTag());
|
||||||
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
ServerStreamListener mockListener = mock(ServerStreamListener.class);
|
||||||
listener.setListener(mockListener);
|
listener.setListener(mockListener);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,5 +25,6 @@ java_library(
|
||||||
"@io_netty_netty_handler_proxy//jar",
|
"@io_netty_netty_handler_proxy//jar",
|
||||||
"@io_netty_netty_resolver//jar",
|
"@io_netty_netty_resolver//jar",
|
||||||
"@io_netty_netty_transport//jar",
|
"@io_netty_netty_transport//jar",
|
||||||
|
"@io_perfmark_perfmark_api//jar",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ import io.netty.channel.EventLoop;
|
||||||
import io.netty.handler.codec.http2.Http2Headers;
|
import io.netty.handler.codec.http2.Http2Headers;
|
||||||
import io.netty.handler.codec.http2.Http2Stream;
|
import io.netty.handler.codec.http2.Http2Stream;
|
||||||
import io.netty.util.AsciiString;
|
import io.netty.util.AsciiString;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -114,8 +115,18 @@ class NettyClientStream extends AbstractClientStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Sink implements AbstractClientStream.Sink {
|
private class Sink implements AbstractClientStream.Sink {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeHeaders(Metadata headers, byte[] requestPayload) {
|
public void writeHeaders(Metadata headers, byte[] requestPayload) {
|
||||||
|
PerfMark.startTask("NettyClientStream$Sink.writeHeaders");
|
||||||
|
try {
|
||||||
|
writeHeadersInternal(headers, requestPayload);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyClientStream$Sink.writeHeaders");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
|
||||||
// Convert the headers into Netty HTTP/2 headers.
|
// Convert the headers into Netty HTTP/2 headers.
|
||||||
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
|
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
|
||||||
if (defaultPath == null) {
|
if (defaultPath == null) {
|
||||||
|
|
@ -152,15 +163,13 @@ class NettyClientStream extends AbstractClientStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Write the command requesting the creation of the stream.
|
// Write the command requesting the creation of the stream.
|
||||||
writeQueue.enqueue(
|
writeQueue.enqueue(
|
||||||
new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
|
new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
|
||||||
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
|
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void writeFrameInternal(
|
||||||
public void writeFrame(
|
|
||||||
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
|
WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
|
||||||
Preconditions.checkArgument(numMessages >= 0);
|
Preconditions.checkArgument(numMessages >= 0);
|
||||||
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
|
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
|
||||||
|
|
@ -184,12 +193,23 @@ class NettyClientStream extends AbstractClientStream {
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// The frame is empty and will not impact outbound flow control. Just send it.
|
// The frame is empty and will not impact outbound flow control. Just send it.
|
||||||
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
|
writeQueue.enqueue(
|
||||||
|
new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void request(final int numMessages) {
|
public void writeFrame(
|
||||||
|
WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
|
||||||
|
PerfMark.startTask("NettyClientStream$Sink.writeFrame");
|
||||||
|
try {
|
||||||
|
writeFrameInternal(frame, endOfStream, flush, numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyClientStream$Sink.writeFrame");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void requestInternal(final int numMessages) {
|
||||||
if (channel.eventLoop().inEventLoop()) {
|
if (channel.eventLoop().inEventLoop()) {
|
||||||
// Processing data read in the event loop so can call into the deframer immediately
|
// Processing data read in the event loop so can call into the deframer immediately
|
||||||
transportState().requestMessagesFromDeframer(numMessages);
|
transportState().requestMessagesFromDeframer(numMessages);
|
||||||
|
|
@ -203,9 +223,24 @@ class NettyClientStream extends AbstractClientStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void request(int numMessages) {
|
||||||
|
PerfMark.startTask("NettyClientStream$Sink.request");
|
||||||
|
try {
|
||||||
|
requestInternal(numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyClientStream$Sink.request");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(Status status) {
|
public void cancel(Status status) {
|
||||||
|
PerfMark.startTask("NettyClientStream$Sink.cancel");
|
||||||
|
try {
|
||||||
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
|
writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyClientStream$Sink.cancel");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,7 @@ import io.netty.channel.ChannelFutureListener;
|
||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.handler.codec.http2.Http2Headers;
|
import io.netty.handler.codec.http2.Http2Headers;
|
||||||
import io.netty.handler.codec.http2.Http2Stream;
|
import io.netty.handler.codec.http2.Http2Stream;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
|
@ -91,8 +92,8 @@ class NettyServerStream extends AbstractServerStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Sink implements AbstractServerStream.Sink {
|
private class Sink implements AbstractServerStream.Sink {
|
||||||
@Override
|
|
||||||
public void request(final int numMessages) {
|
private void requestInternal(final int numMessages) {
|
||||||
if (channel.eventLoop().inEventLoop()) {
|
if (channel.eventLoop().inEventLoop()) {
|
||||||
// Processing data read in the event loop so can call into the deframer immediately
|
// Processing data read in the event loop so can call into the deframer immediately
|
||||||
transportState().requestMessagesFromDeframer(numMessages);
|
transportState().requestMessagesFromDeframer(numMessages);
|
||||||
|
|
@ -106,17 +107,31 @@ class NettyServerStream extends AbstractServerStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void request(final int numMessages) {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.request");
|
||||||
|
try {
|
||||||
|
requestInternal(numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyServerStream$Sink.request");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeHeaders(Metadata headers) {
|
public void writeHeaders(Metadata headers) {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.writeHeaders");
|
||||||
|
try {
|
||||||
writeQueue.enqueue(
|
writeQueue.enqueue(
|
||||||
SendResponseHeadersCommand.createHeaders(
|
SendResponseHeadersCommand.createHeaders(
|
||||||
transportState(),
|
transportState(),
|
||||||
Utils.convertServerHeaders(headers)),
|
Utils.convertServerHeaders(headers)),
|
||||||
true);
|
true);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyServerStream$Sink.writeHeaders");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private void writeFrameInternal(WritableBuffer frame, boolean flush, final int numMessages) {
|
||||||
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
|
|
||||||
Preconditions.checkArgument(numMessages >= 0);
|
Preconditions.checkArgument(numMessages >= 0);
|
||||||
if (frame == null) {
|
if (frame == null) {
|
||||||
writeQueue.scheduleFlush();
|
writeQueue.scheduleFlush();
|
||||||
|
|
@ -140,17 +155,37 @@ class NettyServerStream extends AbstractServerStream {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.writeFrame");
|
||||||
|
try {
|
||||||
|
writeFrameInternal(frame, flush, numMessages);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyServerStream$Sink.writeFrame");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
|
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.writeTrailers");
|
||||||
|
try {
|
||||||
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
|
Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent);
|
||||||
writeQueue.enqueue(
|
writeQueue.enqueue(
|
||||||
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
|
SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status),
|
||||||
true);
|
true);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("NettyServerStream$Sink.writeTrailers");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancel(Status status) {
|
public void cancel(Status status) {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.cancel");
|
||||||
|
try {
|
||||||
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
|
writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true);
|
||||||
|
} finally {
|
||||||
|
PerfMark.startTask("NettyServerStream$Sink.cancel");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,8 @@ import io.netty.buffer.ByteBufHolder;
|
||||||
import io.netty.buffer.DefaultByteBufHolder;
|
import io.netty.buffer.DefaultByteBufHolder;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.perfmark.Link;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint.
|
* Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint.
|
||||||
|
|
@ -28,6 +30,7 @@ import io.netty.channel.ChannelPromise;
|
||||||
final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
|
final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
|
||||||
private final StreamIdHolder stream;
|
private final StreamIdHolder stream;
|
||||||
private final boolean endStream;
|
private final boolean endStream;
|
||||||
|
private final Link link;
|
||||||
|
|
||||||
private ChannelPromise promise;
|
private ChannelPromise promise;
|
||||||
|
|
||||||
|
|
@ -35,6 +38,12 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu
|
||||||
super(content);
|
super(content);
|
||||||
this.stream = stream;
|
this.stream = stream;
|
||||||
this.endStream = endStream;
|
this.endStream = endStream;
|
||||||
|
this.link = PerfMark.link();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Link getLink() {
|
||||||
|
return link;
|
||||||
}
|
}
|
||||||
|
|
||||||
int streamId() {
|
int streamId() {
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelPromise;
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.perfmark.Link;
|
||||||
|
import io.perfmark.PerfMark;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
@ -104,26 +106,44 @@ class WriteQueue {
|
||||||
* called in the event loop
|
* called in the event loop
|
||||||
*/
|
*/
|
||||||
private void flush() {
|
private void flush() {
|
||||||
|
PerfMark.startTask("WriteQueue.periodicFlush");
|
||||||
try {
|
try {
|
||||||
QueuedCommand cmd;
|
QueuedCommand cmd;
|
||||||
int i = 0;
|
int i = 0;
|
||||||
boolean flushedOnce = false;
|
boolean flushedOnce = false;
|
||||||
while ((cmd = queue.poll()) != null) {
|
while ((cmd = queue.poll()) != null) {
|
||||||
|
PerfMark.startTask("WriteQueue.run");
|
||||||
|
try {
|
||||||
|
cmd.getLink().link();
|
||||||
cmd.run(channel);
|
cmd.run(channel);
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("WriteQueue.run");
|
||||||
|
}
|
||||||
if (++i == DEQUE_CHUNK_SIZE) {
|
if (++i == DEQUE_CHUNK_SIZE) {
|
||||||
i = 0;
|
i = 0;
|
||||||
// Flush each chunk so we are releasing buffers periodically. In theory this loop
|
// Flush each chunk so we are releasing buffers periodically. In theory this loop
|
||||||
// might never end as new events are continuously added to the queue, if we never
|
// might never end as new events are continuously added to the queue, if we never
|
||||||
// flushed in that case we would be guaranteed to OOM.
|
// flushed in that case we would be guaranteed to OOM.
|
||||||
|
PerfMark.startTask("WriteQueue.flush0");
|
||||||
|
try {
|
||||||
channel.flush();
|
channel.flush();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("WriteQueue.flush0");
|
||||||
|
}
|
||||||
flushedOnce = true;
|
flushedOnce = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Must flush at least once, even if there were no writes.
|
// Must flush at least once, even if there were no writes.
|
||||||
if (i != 0 || !flushedOnce) {
|
if (i != 0 || !flushedOnce) {
|
||||||
|
PerfMark.startTask("WriteQueue.flush1");
|
||||||
|
try {
|
||||||
channel.flush();
|
channel.flush();
|
||||||
|
} finally {
|
||||||
|
PerfMark.stopTask("WriteQueue.flush1");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
PerfMark.stopTask("WriteQueue.periodicFlush");
|
||||||
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
|
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
|
||||||
scheduled.set(false);
|
scheduled.set(false);
|
||||||
if (!queue.isEmpty()) {
|
if (!queue.isEmpty()) {
|
||||||
|
|
@ -134,8 +154,10 @@ class WriteQueue {
|
||||||
|
|
||||||
private static class RunnableCommand implements QueuedCommand {
|
private static class RunnableCommand implements QueuedCommand {
|
||||||
private final Runnable runnable;
|
private final Runnable runnable;
|
||||||
|
private final Link link;
|
||||||
|
|
||||||
public RunnableCommand(Runnable runnable) {
|
public RunnableCommand(Runnable runnable) {
|
||||||
|
this.link = PerfMark.link();
|
||||||
this.runnable = runnable;
|
this.runnable = runnable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -153,11 +175,21 @@ class WriteQueue {
|
||||||
public final void run(Channel channel) {
|
public final void run(Channel channel) {
|
||||||
runnable.run();
|
runnable.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Link getLink() {
|
||||||
|
return link;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract static class AbstractQueuedCommand implements QueuedCommand {
|
abstract static class AbstractQueuedCommand implements QueuedCommand {
|
||||||
|
|
||||||
private ChannelPromise promise;
|
private ChannelPromise promise;
|
||||||
|
private final Link link;
|
||||||
|
|
||||||
|
AbstractQueuedCommand() {
|
||||||
|
this.link = PerfMark.link();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void promise(ChannelPromise promise) {
|
public final void promise(ChannelPromise promise) {
|
||||||
|
|
@ -173,6 +205,11 @@ class WriteQueue {
|
||||||
public final void run(Channel channel) {
|
public final void run(Channel channel) {
|
||||||
channel.write(this, promise);
|
channel.write(this, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Link getLink() {
|
||||||
|
return link;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -190,5 +227,7 @@ class WriteQueue {
|
||||||
void promise(ChannelPromise promise);
|
void promise(ChannelPromise promise);
|
||||||
|
|
||||||
void run(Channel channel);
|
void run(Channel channel);
|
||||||
|
|
||||||
|
Link getLink();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ def grpc_java_repositories(
|
||||||
omit_io_netty_tcnative_boringssl_static = False,
|
omit_io_netty_tcnative_boringssl_static = False,
|
||||||
omit_io_opencensus_api = False,
|
omit_io_opencensus_api = False,
|
||||||
omit_io_opencensus_grpc_metrics = False,
|
omit_io_opencensus_grpc_metrics = False,
|
||||||
|
omit_io_perfmark = False,
|
||||||
omit_javax_annotation = False,
|
omit_javax_annotation = False,
|
||||||
omit_junit_junit = False,
|
omit_junit_junit = False,
|
||||||
omit_net_zlib = False,
|
omit_net_zlib = False,
|
||||||
|
|
@ -103,6 +104,8 @@ def grpc_java_repositories(
|
||||||
io_opencensus_api()
|
io_opencensus_api()
|
||||||
if not omit_io_opencensus_grpc_metrics:
|
if not omit_io_opencensus_grpc_metrics:
|
||||||
io_opencensus_grpc_metrics()
|
io_opencensus_grpc_metrics()
|
||||||
|
if not omit_io_perfmark:
|
||||||
|
io_perfmark()
|
||||||
if not omit_javax_annotation:
|
if not omit_javax_annotation:
|
||||||
javax_annotation()
|
javax_annotation()
|
||||||
if not omit_junit_junit:
|
if not omit_junit_junit:
|
||||||
|
|
@ -402,6 +405,15 @@ def io_opencensus_grpc_metrics():
|
||||||
licenses = ["notice"], # Apache 2.0
|
licenses = ["notice"], # Apache 2.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def io_perfmark():
|
||||||
|
jvm_maven_import_external(
|
||||||
|
name = "io_perfmark_perfmark_api",
|
||||||
|
artifact = "io.perfmark:perfmark-api:0.16.0",
|
||||||
|
server_urls = ["http://central.maven.org/maven2"],
|
||||||
|
artifact_sha256 = "a93667875ea9d10315177768739a18d6c667df041c982d2841645ae8558d0af0",
|
||||||
|
licenses = ["notice"], # Apache 2.0
|
||||||
|
)
|
||||||
|
|
||||||
def javax_annotation():
|
def javax_annotation():
|
||||||
# Use //stub:javax_annotation for neverlink=1 support.
|
# Use //stub:javax_annotation for neverlink=1 support.
|
||||||
jvm_maven_import_external(
|
jvm_maven_import_external(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue