core: add client and server call perfmark annotations

This code is highly experimental, so we can change it at will later. This PR is to go ahead and try it out.

Perfmark task calls are added to the client and server calls public APIs, recording when the calls begin and end. They use separate scope IDs (roughly these are Thread IDs) for the listener and the call due to no synchronization between them. However, they both use the same PerfTags, which allows them to be associated.

In the future, we can plumb the tag down into the stream to include deeper information about whats going on in a call.
This commit is contained in:
Carl Mastrangelo 2019-04-03 18:16:22 -07:00 committed by GitHub
parent a17f8ab62c
commit faf6ff9f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 22 deletions

View File

@ -40,6 +40,7 @@ java_library(
visibility = ["//:__subpackages__"],
deps = [
":core",
":perfmark",
"//context",
"@com_google_android_annotations//jar",
"@com_google_code_findbugs_jsr305//jar",
@ -70,3 +71,18 @@ java_library(
"@com_google_j2objc_j2objc_annotations//jar",
],
)
java_library(
name = "perfmark",
srcs = glob(
[
"src/main/java/io/grpc/perfmark/*.java",
],
exclude = ["src/main/java/io/grpc/perfmark/package-info.java"],
),
visibility = ["//:__subpackages__"],
deps = [
"@com_google_code_findbugs_jsr305//jar",
"@com_google_errorprone_error_prone_annotations//jar",
],
)

View File

@ -99,7 +99,7 @@ public abstract class ServerCall<ReqT, RespT> {
*
* <p>Servers use this mechanism to provide back-pressure to the client for flow-control.
*
* <p>This method is safe to call from multiple threads without external synchronizaton.
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param numMessages the requested number of messages to be delivered to the listener.
*/

View File

@ -46,6 +46,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.CancellationException;
@ -67,6 +69,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
= "gzip".getBytes(Charset.forName("US-ASCII"));
private final MethodDescriptor<ReqT, RespT> method;
private final PerfTag tag;
private final Executor callExecutor;
private final CallTracer channelCallsTracer;
private final Context context;
@ -92,6 +95,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
CallTracer channelCallsTracer,
boolean retryEnabled) {
this.method = method;
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName());
// If we know that the executor is a direct executor, we don't need to wrap it with a
// SerializingExecutor. This is purely for performance reasons.
// See https://github.com/grpc/grpc-java/issues/368
@ -177,7 +182,17 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
@Override
public void start(final Listener<RespT> observer, Metadata headers) {
public void start(Listener<RespT> observer, Metadata headers) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.start");
try {
startInternal(observer, headers);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void startInternal(final Listener<RespT> observer, Metadata headers) {
checkState(stream == null, "Already started");
checkState(!cancelCalled, "call was cancelled");
checkNotNull(observer, "observer");
@ -371,6 +386,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.cancel");
try {
cancelInternal(message, cause);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void cancelInternal(@Nullable String message, @Nullable Throwable cause) {
if (message == null && cause == null) {
cause = new CancellationException("Cancelled without a message or cause");
log.log(Level.WARNING, "Cancelling without a message or cause is suboptimal", cause);
@ -401,6 +426,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void halfClose() {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.halfClose");
try {
halfCloseInternal();
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void halfCloseInternal() {
checkState(stream != null, "Not started");
checkState(!cancelCalled, "call was cancelled");
checkState(!halfCloseCalled, "call already half-closed");
@ -410,6 +445,16 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void sendMessage(ReqT message) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ClientCall.sendMessage");
try {
sendMessageInternal(message);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void sendMessageInternal(ReqT message) {
checkState(stream != null, "Not started");
checkState(!cancelCalled, "call was cancelled");
checkState(!halfCloseCalled, "call was half-closed");
@ -467,6 +512,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer;
private boolean closed;
private final long listenerScopeId = PerfTag.allocateNumericId();
public ClientStreamListenerImpl(Listener<RespT> observer) {
this.observer = checkNotNull(observer, "observer");
@ -474,23 +520,27 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void headersRead(final Metadata headers) {
class HeadersRead extends ContextRunnable {
final class HeadersRead extends ContextRunnable {
HeadersRead() {
super(context);
}
@Override
public final void runInContext() {
if (closed) {
return;
}
PerfMark.taskStart(
listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.headersRead");
try {
if (closed) {
return;
}
observer.onHeaders(headers);
} catch (Throwable t) {
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
}
@ -500,7 +550,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void messagesAvailable(final MessageProducer producer) {
class MessagesAvailable extends ContextRunnable {
final class MessagesAvailable extends ContextRunnable {
MessagesAvailable() {
super(context);
}
@ -511,9 +561,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
GrpcUtil.closeQuietly(producer);
return;
}
InputStream message;
PerfMark.taskStart(
listenerScopeId,
Thread.currentThread().getName(),
tag,
"ClientCall.messagesAvailable");
try {
InputStream message;
while ((message = producer.next()) != null) {
try {
observer.onMessage(method.parseResponse(message));
@ -529,6 +583,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
}
@ -570,7 +626,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
final Status savedStatus = status;
final Metadata savedTrailers = trailers;
class StreamClosed extends ContextRunnable {
final class StreamClosed extends ContextRunnable {
StreamClosed() {
super(context);
}
@ -581,7 +637,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
// We intentionally don't keep the status or metadata from the server.
return;
}
close(savedStatus, savedTrailers);
PerfMark.taskStart(
listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.closed");
try {
close(savedStatus, savedTrailers);
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
}
@ -590,13 +652,15 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public void onReady() {
class StreamOnReady extends ContextRunnable {
final class StreamOnReady extends ContextRunnable {
StreamOnReady() {
super(context);
}
@Override
public final void runInContext() {
PerfMark.taskStart(
listenerScopeId, Thread.currentThread().getName(), tag, "ClientCall.onReady");
try {
observer.onReady();
} catch (Throwable t) {
@ -604,6 +668,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
stream.cancel(status);
close(status, new Metadata());
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
}

View File

@ -36,6 +36,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.perfmark.PerfMark;
import io.grpc.perfmark.PerfTag;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -51,6 +53,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private final PerfTag tag;
private final Context.CancellableContext context;
private final byte[] messageAcceptEncoding;
private final DecompressorRegistry decompressorRegistry;
@ -70,6 +73,8 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
CallTracer serverCallTracer) {
this.stream = stream;
this.method = method;
// TODO(carl-mastrangelo): consider moving this to the ServerImpl to record startCall.
this.tag = PerfTag.create(PerfTag.allocateNumericId(), method.getFullMethodName());
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
@ -85,6 +90,16 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void sendHeaders(Metadata headers) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendHeaders");
try {
sendHeadersInternal(headers);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void sendHeadersInternal(Metadata headers) {
checkState(!sendHeadersCalled, "sendHeaders has already been called");
checkState(!closeCalled, "call is closed");
@ -125,6 +140,16 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void sendMessage(RespT message) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.sendMessage");
try {
sendMessageInternal(message);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void sendMessageInternal(RespT message) {
checkState(sendHeadersCalled, "sendHeaders has not been called");
checkState(!closeCalled, "call is closed");
@ -169,6 +194,16 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void close(Status status, Metadata trailers) {
PerfMark.taskStart(
tag.getNumericTag(), Thread.currentThread().getName(), tag, "ServerCall.close");
try {
closeInternal(status, trailers);
} finally {
PerfMark.taskEnd(tag.getNumericTag(), Thread.currentThread().getName());
}
}
private void closeInternal(Status status, Metadata trailers) {
checkState(!closeCalled, "call already closed");
try {
closeCalled = true;
@ -228,6 +263,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final ServerCallImpl<ReqT, ?> call;
private final ServerCall.Listener<ReqT> listener;
private final Context.CancellableContext context;
private final long listenerScopeId = PerfTag.allocateNumericId();
public ServerStreamListenerImpl(
ServerCallImpl<ReqT, ?> call, ServerCall.Listener<ReqT> listener,
@ -256,6 +292,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
return;
}
PerfMark.taskStart(
listenerScopeId,
Thread.currentThread().getName(),
call.tag,
"ServerCall.messagesAvailable");
InputStream message;
try {
while ((message = producer.next()) != null) {
@ -271,6 +312,8 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
GrpcUtil.closeQuietly(producer);
MoreThrowables.throwIfUnchecked(t);
throw new RuntimeException(t);
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
@ -280,22 +323,42 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
return;
}
listener.onHalfClose();
PerfMark.taskStart(
listenerScopeId,
Thread.currentThread().getName(),
call.tag,
"ServerCall.halfClosed");
try {
listener.onHalfClose();
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
@Override
public void closed(Status status) {
PerfMark.taskStart(
listenerScopeId,
Thread.currentThread().getName(),
call.tag,
"ServerCall.closed");
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
try {
if (status.isOk()) {
listener.onComplete();
} else {
call.cancelled = true;
listener.onCancel();
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
}
} finally {
// Cancel context after delivering RPC closure notification to allow the application to
// clean up and update any state based on whether onComplete or onCancel was called.
context.cancel(null);
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
@ -304,7 +367,16 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
if (call.cancelled) {
return;
}
listener.onReady();
PerfMark.taskStart(
listenerScopeId,
Thread.currentThread().getName(),
call.tag,
"ServerCall.closed");
try {
listener.onReady();
} finally {
PerfMark.taskEnd(listenerScopeId, Thread.currentThread().getName());
}
}
}
}

View File

@ -596,6 +596,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
ServerStream stream,
Metadata headers,
Context.CancellableContext context) {
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),