diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 3bcf70ac33..cda08576ea 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -25,6 +25,7 @@ import io.grpc.Compressor; import io.grpc.Decompressor; import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.TaskCloseable; import java.io.InputStream; import javax.annotation.concurrent.GuardedBy; @@ -219,25 +220,19 @@ public abstract class AbstractStream implements Stream { */ private void requestMessagesFromDeframer(final int numMessages) { if (deframer instanceof ThreadOptimizedDeframer) { - PerfMark.startTask("AbstractStream.request"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) { deframer.request(numMessages); - } finally { - PerfMark.stopTask("AbstractStream.request"); } return; } final Link link = PerfMark.linkOut(); class RequestRunnable implements Runnable { @Override public void run() { - PerfMark.startTask("AbstractStream.request"); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) { + PerfMark.linkIn(link); deframer.request(numMessages); } catch (Throwable t) { deframeFailed(t); - } finally { - PerfMark.stopTask("AbstractStream.request"); } } } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 000ede7705..df28eba057 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -52,6 +52,7 @@ import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo; import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Locale; @@ -187,11 +188,9 @@ final class ClientCallImpl extends ClientCall { @Override public void start(Listener observer, Metadata headers) { - PerfMark.startTask("ClientCall.start", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.start")) { + PerfMark.attachTag(tag); startInternal(observer, headers); - } finally { - PerfMark.stopTask("ClientCall.start", tag); } } @@ -446,23 +445,19 @@ final class ClientCallImpl extends ClientCall { @Override public void request(int numMessages) { - PerfMark.startTask("ClientCall.request", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.request")) { + PerfMark.attachTag(tag); checkState(stream != null, "Not started"); checkArgument(numMessages >= 0, "Number requested must be non-negative"); stream.request(numMessages); - } finally { - PerfMark.stopTask("ClientCall.request", tag); } } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - PerfMark.startTask("ClientCall.cancel", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.cancel")) { + PerfMark.attachTag(tag); cancelInternal(message, cause); - } finally { - PerfMark.stopTask("ClientCall.cancel", tag); } } @@ -497,11 +492,9 @@ final class ClientCallImpl extends ClientCall { @Override public void halfClose() { - PerfMark.startTask("ClientCall.halfClose", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.halfClose")) { + PerfMark.attachTag(tag); halfCloseInternal(); - } finally { - PerfMark.stopTask("ClientCall.halfClose", tag); } } @@ -515,11 +508,9 @@ final class ClientCallImpl extends ClientCall { @Override public void sendMessage(ReqT message) { - PerfMark.startTask("ClientCall.sendMessage", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall.sendMessage")) { + PerfMark.attachTag(tag); sendMessageInternal(message); - } finally { - PerfMark.stopTask("ClientCall.sendMessage", tag); } } @@ -603,104 +594,93 @@ final class ClientCallImpl extends ClientCall { @Override public void headersRead(final Metadata headers) { - PerfMark.startTask("ClientStreamListener.headersRead", tag); - final Link link = PerfMark.linkOut(); + try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.headersRead")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); + final class HeadersRead extends ContextRunnable { + HeadersRead() { + super(context); + } - final class HeadersRead extends ContextRunnable { - HeadersRead() { - super(context); - } + @Override + public void runInContext() { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.headersRead")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + runInternal(); + } + } - @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.headersRead", tag); - PerfMark.linkIn(link); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.headersRead", tag); + private void runInternal() { + if (exceptionStatus != null) { + return; + } + try { + observer.onHeaders(headers); + } catch (Throwable t) { + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to read headers")); + } } } - private void runInternal() { - if (exceptionStatus != null) { - return; - } - try { - observer.onHeaders(headers); - } catch (Throwable t) { - exceptionThrown( - Status.CANCELLED.withCause(t).withDescription("Failed to read headers")); - } - } - } - - try { callExecutor.execute(new HeadersRead()); - } finally { - PerfMark.stopTask("ClientStreamListener.headersRead", tag); } } @Override public void messagesAvailable(final MessageProducer producer) { - PerfMark.startTask("ClientStreamListener.messagesAvailable", tag); - final Link link = PerfMark.linkOut(); - - final class MessagesAvailable extends ContextRunnable { - MessagesAvailable() { - super(context); - } - - @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.messagesAvailable", tag); - PerfMark.linkIn(link); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.messagesAvailable", tag); + try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.messagesAvailable")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); + final class MessagesAvailable extends ContextRunnable { + MessagesAvailable() { + super(context); } - } - private void runInternal() { - if (exceptionStatus != null) { - GrpcUtil.closeQuietly(producer); - return; - } - try { - InputStream message; - while ((message = producer.next()) != null) { - try { - observer.onMessage(method.parseResponse(message)); - } catch (Throwable t) { - GrpcUtil.closeQuietly(message); - throw t; - } - message.close(); + @Override + public void runInContext() { + try (TaskCloseable ignore = + PerfMark.traceTask("ClientCall$Listener.messagesAvailable")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + runInternal(); + } + } + + private void runInternal() { + if (exceptionStatus != null) { + GrpcUtil.closeQuietly(producer); + return; + } + try { + InputStream message; + while ((message = producer.next()) != null) { + try { + observer.onMessage(method.parseResponse(message)); + } catch (Throwable t) { + GrpcUtil.closeQuietly(message); + throw t; + } + message.close(); + } + } catch (Throwable t) { + GrpcUtil.closeQuietly(producer); + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to read message.")); } - } catch (Throwable t) { - GrpcUtil.closeQuietly(producer); - exceptionThrown( - Status.CANCELLED.withCause(t).withDescription("Failed to read message.")); } } - } - try { callExecutor.execute(new MessagesAvailable()); - } finally { - PerfMark.stopTask("ClientStreamListener.messagesAvailable", tag); } } @Override public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { - PerfMark.startTask("ClientStreamListener.closed", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.closed")) { + PerfMark.attachTag(tag); closedInternal(status, rpcProgress, trailers); - } finally { - PerfMark.stopTask("ClientStreamListener.closed", tag); } } @@ -730,12 +710,10 @@ final class ClientCallImpl extends ClientCall { @Override public void runInContext() { - PerfMark.startTask("ClientCall$Listener.onClose", tag); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onClose")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.onClose", tag); } } @@ -770,43 +748,38 @@ final class ClientCallImpl extends ClientCall { if (method.getType().clientSendsOneMessage()) { return; } + try (TaskCloseable ignore = PerfMark.traceTask("ClientStreamListener.onReady")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); - PerfMark.startTask("ClientStreamListener.onReady", tag); - final Link link = PerfMark.linkOut(); + final class StreamOnReady extends ContextRunnable { + StreamOnReady() { + super(context); + } - final class StreamOnReady extends ContextRunnable { - StreamOnReady() { - super(context); - } + @Override + public void runInContext() { + try (TaskCloseable ignore = PerfMark.traceTask("ClientCall$Listener.onReady")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + runInternal(); + } + } - @Override - public void runInContext() { - PerfMark.startTask("ClientCall$Listener.onReady", tag); - PerfMark.linkIn(link); - try { - runInternal(); - } finally { - PerfMark.stopTask("ClientCall$Listener.onReady", tag); + private void runInternal() { + if (exceptionStatus != null) { + return; + } + try { + observer.onReady(); + } catch (Throwable t) { + exceptionThrown( + Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.")); + } } } - private void runInternal() { - if (exceptionStatus != null) { - return; - } - try { - observer.onReady(); - } catch (Throwable t) { - exceptionThrown( - Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.")); - } - } - } - - try { callExecutor.execute(new StreamOnReady()); - } finally { - PerfMark.stopTask("ClientStreamListener.onReady", tag); } } } diff --git a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java index f820e7ff15..c3342556c9 100644 --- a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java +++ b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.Decompressor; import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.TaskCloseable; import java.io.Closeable; import java.io.InputStream; import java.util.ArrayDeque; @@ -107,11 +108,9 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { } else { if (!alreadyEnqueued) { if (currentThreadIsTransportThread) { - PerfMark.startTask("MigratingThreadDeframer.messageAvailable"); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) { transportListener.messagesAvailable(messageProducer); - } finally { - PerfMark.stopTask("MigratingThreadDeframer.messageAvailable"); } } else { final Link link = PerfMark.linkOut(); @@ -119,12 +118,10 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { // MigratingThreadDeframer transportExecutor.runOnTransportThread(new Runnable() { @Override public void run() { - PerfMark.startTask("MigratingThreadDeframer.messageAvailable"); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("MigratingThreadDeframer.messageAvailable")) { + PerfMark.linkIn(link); transportListener.messagesAvailable(messageProducer); - } finally { - PerfMark.stopTask("MigratingThreadDeframer.messageAvailable"); } } }); @@ -145,28 +142,22 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { // necessary processing transportExecutor.runOnTransportThread(new Runnable() { @Override public void run() { - PerfMark.startTask("MigratingThreadDeframer.request"); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) { + PerfMark.linkIn(link); // Since processing continues from transport thread while this runnable was // enqueued, the state may have changed since we ran runOnTransportThread. So we // must make sure deframerOnTransportThread==true requestFromTransportThread(numMessages); - } finally { - PerfMark.stopTask("MigratingThreadDeframer.request"); } } }); return; } - PerfMark.startTask("MigratingThreadDeframer.request"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.request")) { deframer.request(numMessages); } catch (Throwable t) { appListener.deframeFailed(t); deframer.close(); // unrecoverable state - } finally { - PerfMark.stopTask("MigratingThreadDeframer.request"); } } } @@ -205,8 +196,7 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { public void deframe(final ReadableBuffer data) { class DeframeOp implements Op, Closeable { @Override public void run(boolean isDeframerOnTransportThread) { - PerfMark.startTask("MigratingThreadDeframer.deframe"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("MigratingThreadDeframer.deframe")) { if (isDeframerOnTransportThread) { deframer.deframe(data); return; @@ -218,8 +208,6 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { appListener.deframeFailed(t); deframer.close(); // unrecoverable state } - } finally { - PerfMark.stopTask("MigratingThreadDeframer.deframe"); } } diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 47ffdf9caa..7555c4d78a 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -43,6 +43,7 @@ import io.grpc.ServerCall; import io.grpc.Status; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; @@ -89,21 +90,17 @@ final class ServerCallImpl extends ServerCall { @Override public void request(int numMessages) { - PerfMark.startTask("ServerCall.request", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.request")) { + PerfMark.attachTag(tag); stream.request(numMessages); - } finally { - PerfMark.stopTask("ServerCall.request", tag); } } @Override public void sendHeaders(Metadata headers) { - PerfMark.startTask("ServerCall.sendHeaders", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendHeaders")) { + PerfMark.attachTag(tag); sendHeadersInternal(headers); - } finally { - PerfMark.stopTask("ServerCall.sendHeaders", tag); } } @@ -149,11 +146,9 @@ final class ServerCallImpl extends ServerCall { @Override public void sendMessage(RespT message) { - PerfMark.startTask("ServerCall.sendMessage", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.sendMessage")) { + PerfMark.attachTag(tag); sendMessageInternal(message); - } finally { - PerfMark.stopTask("ServerCall.sendMessage", tag); } } @@ -207,11 +202,9 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { - PerfMark.startTask("ServerCall.close", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCall.close")) { + PerfMark.attachTag(tag); closeInternal(status, trailers); - } finally { - PerfMark.stopTask("ServerCall.close", tag); } } @@ -311,11 +304,9 @@ final class ServerCallImpl extends ServerCall { @Override public void messagesAvailable(MessageProducer producer) { - PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) { + PerfMark.attachTag(call.tag); messagesAvailableInternal(producer); - } finally { - PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag); } } @@ -346,25 +337,21 @@ final class ServerCallImpl extends ServerCall { @Override public void halfClosed() { - PerfMark.startTask("ServerStreamListener.halfClosed", call.tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) { + PerfMark.attachTag(call.tag); if (call.cancelled) { return; } listener.onHalfClose(); - } finally { - PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag); } } @Override public void closed(Status status) { - PerfMark.startTask("ServerStreamListener.closed", call.tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) { + PerfMark.attachTag(call.tag); closedInternal(status); - } finally { - PerfMark.stopTask("ServerStreamListener.closed", call.tag); } } @@ -390,14 +377,12 @@ final class ServerCallImpl extends ServerCall { @Override public void onReady() { - PerfMark.startTask("ServerStreamListener.onReady", call.tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) { + PerfMark.attachTag(call.tag); if (call.cancelled) { return; } listener.onReady(); - } finally { - PerfMark.stopTask("ServerCall.closed", call.tag); } } } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index bbd52c14bb..268e4b2987 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -58,6 +58,7 @@ import io.grpc.Status; import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; @@ -461,11 +462,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void streamCreated(ServerStream stream, String methodName, Metadata headers) { Tag tag = PerfMark.createTag(methodName, stream.streamId()); - PerfMark.startTask("ServerTransportListener.streamCreated", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerTransportListener.streamCreated")) { + PerfMark.attachTag(tag); streamCreatedInternal(stream, methodName, headers, tag); - } finally { - PerfMark.stopTask("ServerTransportListener.streamCreated", tag); } } @@ -523,12 +522,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("ServerTransportListener$MethodLookup.startCall")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); runInternal(); - } finally { - PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag); } } @@ -598,12 +596,11 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("ServerTransportListener$HandleServerCall.startCall")) { + PerfMark.linkIn(link); + PerfMark.attachTag(tag); runInternal(); - } finally { - PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag); } } @@ -818,76 +815,65 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void messagesAvailable(final MessageProducer producer) { - PerfMark.startTask("ServerStreamListener.messagesAvailable", tag); - final Link link = PerfMark.linkOut(); + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.messagesAvailable")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); + final class MessagesAvailable extends ContextRunnable { - final class MessagesAvailable extends ContextRunnable { + MessagesAvailable() { + super(context); + } - MessagesAvailable() { - super(context); - } - - @Override - public void runInContext() { - PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag); - PerfMark.linkIn(link); - try { - getListener().messagesAvailable(producer); - } catch (Throwable t) { - internalClose(t); - throw t; - } finally { - PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); + @Override + public void runInContext() { + try (TaskCloseable ignore = + PerfMark.traceTask("ServerCallListener(app).messagesAvailable")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + getListener().messagesAvailable(producer); + } catch (Throwable t) { + internalClose(t); + throw t; + } } } - } - try { callExecutor.execute(new MessagesAvailable()); - } finally { - PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag); } } @Override public void halfClosed() { - PerfMark.startTask("ServerStreamListener.halfClosed", tag); - final Link link = PerfMark.linkOut(); + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.halfClosed")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); + final class HalfClosed extends ContextRunnable { + HalfClosed() { + super(context); + } - final class HalfClosed extends ContextRunnable { - HalfClosed() { - super(context); - } - - @Override - public void runInContext() { - PerfMark.startTask("ServerCallListener(app).halfClosed", tag); - PerfMark.linkIn(link); - try { - getListener().halfClosed(); - } catch (Throwable t) { - internalClose(t); - throw t; - } finally { - PerfMark.stopTask("ServerCallListener(app).halfClosed", tag); + @Override + public void runInContext() { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).halfClosed")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + getListener().halfClosed(); + } catch (Throwable t) { + internalClose(t); + throw t; + } } } - } - try { callExecutor.execute(new HalfClosed()); - } finally { - PerfMark.stopTask("ServerStreamListener.halfClosed", tag); } } @Override public void closed(final Status status) { - PerfMark.startTask("ServerStreamListener.closed", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.closed")) { + PerfMark.attachTag(tag); closedInternal(status); - } finally { - PerfMark.stopTask("ServerStreamListener.closed", tag); } } @@ -917,12 +903,10 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void runInContext() { - PerfMark.startTask("ServerCallListener(app).closed", tag); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).closed")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); getListener().closed(status); - } finally { - PerfMark.stopTask("ServerCallListener(app).closed", tag); } } } @@ -932,32 +916,29 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void onReady() { - PerfMark.startTask("ServerStreamListener.onReady", tag); - final Link link = PerfMark.linkOut(); - final class OnReady extends ContextRunnable { - OnReady() { - super(context); - } + try (TaskCloseable ignore = PerfMark.traceTask("ServerStreamListener.onReady")) { + PerfMark.attachTag(tag); + final Link link = PerfMark.linkOut(); - @Override - public void runInContext() { - PerfMark.startTask("ServerCallListener(app).onReady", tag); - PerfMark.linkIn(link); - try { - getListener().onReady(); - } catch (Throwable t) { - internalClose(t); - throw t; - } finally { - PerfMark.stopTask("ServerCallListener(app).onReady", tag); + final class OnReady extends ContextRunnable { + OnReady() { + super(context); + } + + @Override + public void runInContext() { + try (TaskCloseable ignore = PerfMark.traceTask("ServerCallListener(app).onReady")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); + getListener().onReady(); + } catch (Throwable t) { + internalClose(t); + throw t; + } } } - } - try { callExecutor.execute(new OnReady()); - } finally { - PerfMark.stopTask("ServerStreamListener.onReady", tag); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 1ac1458515..792d76b135 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -79,6 +79,7 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -592,13 +593,11 @@ class NettyClientHandler extends AbstractNettyHandler { Http2Headers headers = command.headers(); stream.setId(streamId); - PerfMark.startTask("NettyClientHandler.createStream", stream.tag()); - PerfMark.linkIn(command.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) { + PerfMark.linkIn(command.getLink()); + PerfMark.attachTag(stream.tag()); createStreamTraced( streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); - } finally { - PerfMark.stopTask("NettyClientHandler.createStream", stream.tag()); } } @@ -670,9 +669,9 @@ class NettyClientHandler extends AbstractNettyHandler { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag()); - PerfMark.linkIn(cmd.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) { + PerfMark.attachTag(stream.tag()); + PerfMark.linkIn(cmd.getLink()); Status reason = cmd.reason(); if (reason != null) { stream.transportReportStatus(reason, true, new Metadata()); @@ -682,8 +681,6 @@ class NettyClientHandler extends AbstractNettyHandler { } else { promise.setSuccess(); } - } finally { - PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag()); } } @@ -692,25 +689,20 @@ class NettyClientHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) { - PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); - PerfMark.linkIn(cmd.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) { + PerfMark.attachTag(cmd.stream().tag()); + PerfMark.linkIn(cmd.getLink()); // Call the base class to write the HTTP/2 DATA frame. // Note: no need to flush since this is handled by the outbound flow controller. encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); - } finally { - PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); } } private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise) { - PerfMark.startTask("NettyClientHandler.sendPingFrame"); - PerfMark.linkIn(msg.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) { + PerfMark.linkIn(msg.getLink()); sendPingFrameTraced(ctx, msg, promise); - } finally { - PerfMark.stopTask("NettyClientHandler.sendPingFrame"); } } @@ -788,17 +780,15 @@ class NettyClientHandler extends AbstractNettyHandler { public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); - PerfMark.startTask("NettyClientHandler.forcefulClose", tag); - PerfMark.linkIn(msg.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) { + PerfMark.linkIn(msg.getLink()); + PerfMark.attachTag(tag); if (clientStream != null) { clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); } stream.close(); return true; - } finally { - PerfMark.stopTask("NettyClientHandler.forcefulClose", tag); } } }); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 9c6c12cc0f..0c0bb7eeb8 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -46,6 +46,7 @@ import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import javax.annotation.Nullable; /** @@ -117,11 +118,9 @@ class NettyClientStream extends AbstractClientStream { @Override public void writeHeaders(Metadata headers, byte[] requestPayload) { - PerfMark.startTask("NettyClientStream$Sink.writeHeaders"); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("NettyClientStream$Sink.writeHeaders")) { writeHeadersInternal(headers, requestPayload); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.writeHeaders"); } } @@ -207,21 +206,15 @@ class NettyClientStream extends AbstractClientStream { @Override public void writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { - PerfMark.startTask("NettyClientStream$Sink.writeFrame"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.writeFrame")) { writeFrameInternal(frame, endOfStream, flush, numMessages); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.writeFrame"); } } @Override public void cancel(Status status) { - PerfMark.startTask("NettyClientStream$Sink.cancel"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.cancel")) { writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true); - } finally { - PerfMark.stopTask("NettyClientStream$Sink.cancel"); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 6382471f46..fd053125d5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -93,6 +93,7 @@ import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.Future; @@ -473,8 +474,8 @@ class NettyServerHandler extends AbstractNettyHandler { transportTracer, method); - PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) { + PerfMark.attachTag(state.tag()); String authority = getOrUpdateAuthority((AsciiString) headers.authority()); NettyServerStream stream = new NettyServerStream( ctx.channel(), @@ -486,8 +487,6 @@ class NettyServerHandler extends AbstractNettyHandler { transportListener.streamCreated(stream, method, metadata); state.onStreamAllocated(); http2Stream.setProperty(streamKey, state); - } finally { - PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag()); } } catch (Exception e) { logger.log(Level.WARNING, "Exception in onHeadersRead()", e); @@ -513,11 +512,9 @@ class NettyServerHandler extends AbstractNettyHandler { flowControlPing().onDataRead(data.readableBytes(), padding); try { NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); - PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) { + PerfMark.attachTag(stream.tag()); stream.inboundDataReceived(data, endOfStream); - } finally { - PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag()); } } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()", e); @@ -530,12 +527,10 @@ class NettyServerHandler extends AbstractNettyHandler { try { NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); if (stream != null) { - PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) { + PerfMark.attachTag(stream.tag()); stream.transportReportStatus( Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); - } finally { - PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag()); } } } catch (Throwable e) { @@ -564,16 +559,14 @@ class NettyServerHandler extends AbstractNettyHandler { } logger.log(level, "Stream Error", cause); Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag(); - PerfMark.startTask("NettyServerHandler.onStreamError", tag); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) { + PerfMark.attachTag(tag); if (serverStream != null) { serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); } // TODO(ejona): Abort the stream by sending headers to help the client with debugging. // Delegate to the base class to send a RST_STREAM. super.onStreamError(ctx, outbound, cause, http2Ex); - } finally { - PerfMark.stopTask("NettyServerHandler.onStreamError", tag); } } @@ -699,16 +692,14 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) throws Http2Exception { - PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); - PerfMark.linkIn(cmd.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) { + PerfMark.attachTag(cmd.stream().tag()); + PerfMark.linkIn(cmd.getLink()); if (cmd.endStream()) { closeStreamWhenDone(promise, cmd.stream().id()); } // Call the base class to write the HTTP/2 DATA frame. encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); - } finally { - PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); } } @@ -717,9 +708,9 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise) throws Http2Exception { - PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); - PerfMark.linkIn(cmd.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) { + PerfMark.attachTag(cmd.stream().tag()); + PerfMark.linkIn(cmd.getLink()); // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 // is fixed. int streamId = cmd.stream().id(); @@ -732,22 +723,18 @@ class NettyServerHandler extends AbstractNettyHandler { closeStreamWhenDone(promise, streamId); } encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); - } finally { - PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); } } private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise) { - PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag()); - PerfMark.linkIn(cmd.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) { + PerfMark.attachTag(cmd.stream().tag()); + PerfMark.linkIn(cmd.getLink()); // Notify the listener if we haven't already. cmd.stream().transportReportStatus(cmd.reason()); // Terminate the stream. encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); - } finally { - PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag()); } } @@ -774,13 +761,11 @@ class NettyServerHandler extends AbstractNettyHandler { public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { - PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag()); - PerfMark.linkIn(msg.getLink()); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) { + PerfMark.attachTag(serverStream.tag()); + PerfMark.linkIn(msg.getLink()); serverStream.transportReportStatus(msg.getStatus()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); - } finally { - PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag()); } } stream.close(); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 6ab391b260..80c8d9fb3a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -36,6 +36,7 @@ import io.netty.handler.codec.http2.Http2Stream; import io.perfmark.Link; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.util.logging.Level; import java.util.logging.Logger; @@ -94,15 +95,12 @@ class NettyServerStream extends AbstractServerStream { private class Sink implements AbstractServerStream.Sink { @Override public void writeHeaders(Metadata headers) { - PerfMark.startTask("NettyServerStream$Sink.writeHeaders"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeHeaders")) { writeQueue.enqueue( SendResponseHeadersCommand.createHeaders( transportState(), Utils.convertServerHeaders(headers)), true); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeHeaders"); } } @@ -128,34 +126,25 @@ class NettyServerStream extends AbstractServerStream { @Override public void writeFrame(WritableBuffer frame, boolean flush, final int numMessages) { - PerfMark.startTask("NettyServerStream$Sink.writeFrame"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeFrame")) { writeFrameInternal(frame, flush, numMessages); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeFrame"); } } @Override public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { - PerfMark.startTask("NettyServerStream$Sink.writeTrailers"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.writeTrailers")) { Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); writeQueue.enqueue( SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), true); - } finally { - PerfMark.stopTask("NettyServerStream$Sink.writeTrailers"); } } @Override public void cancel(Status status) { - PerfMark.startTask("NettyServerStream$Sink.cancel"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) { writeQueue.enqueue(new CancelServerStreamCommand(transportState(), status), true); - } finally { - PerfMark.startTask("NettyServerStream$Sink.cancel"); } } } @@ -192,12 +181,11 @@ class NettyServerStream extends AbstractServerStream { eventLoop.execute(new Runnable() { @Override public void run() { - PerfMark.startTask("NettyServerStream$TransportState.runOnTransportThread", tag); - PerfMark.linkIn(link); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("NettyServerStream$TransportState.runOnTransportThread")) { + PerfMark.attachTag(tag); + PerfMark.linkIn(link); r.run(); - } finally { - PerfMark.stopTask("NettyServerStream$TransportState.runOnTransportThread", tag); } } }); diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index f80e3fcaa5..a96b7defac 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -24,6 +24,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.TaskCloseable; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -119,8 +120,7 @@ class WriteQueue { * called in the event loop */ private void flush() { - PerfMark.startTask("WriteQueue.periodicFlush"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("WriteQueue.periodicFlush")) { QueuedCommand cmd; int i = 0; boolean flushedOnce = false; @@ -131,26 +131,19 @@ class WriteQueue { // Flush each chunk so we are releasing buffers periodically. In theory this loop // might never end as new events are continuously added to the queue, if we never // flushed in that case we would be guaranteed to OOM. - PerfMark.startTask("WriteQueue.flush0"); - try { + try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush0")) { channel.flush(); - } finally { - PerfMark.stopTask("WriteQueue.flush0"); } flushedOnce = true; } } // Must flush at least once, even if there were no writes. if (i != 0 || !flushedOnce) { - PerfMark.startTask("WriteQueue.flush1"); - try { + try (TaskCloseable ignore2 = PerfMark.traceTask("WriteQueue.flush1")) { channel.flush(); - } finally { - PerfMark.stopTask("WriteQueue.flush1"); } } } finally { - PerfMark.stopTask("WriteQueue.periodicFlush"); // Mark the write as done, if the queue is non-empty after marking trigger a new write. scheduled.set(false); if (!queue.isEmpty()) { diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncSink.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncSink.java index faf1b7e301..1ac64d7ebb 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncSink.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncSink.java @@ -26,6 +26,7 @@ import io.grpc.okhttp.internal.framed.FrameWriter; import io.grpc.okhttp.internal.framed.Settings; import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.TaskCloseable; import java.io.IOException; import java.net.Socket; import javax.annotation.Nullable; @@ -100,8 +101,7 @@ final class AsyncSink implements Sink { if (closed) { throw new IOException("closed"); } - PerfMark.startTask("AsyncSink.write"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.write")) { boolean closeSocket = false; synchronized (lock) { buffer.write(source, byteCount); @@ -130,10 +130,9 @@ final class AsyncSink implements Sink { final Link link = PerfMark.linkOut(); @Override public void doRun() throws IOException { - PerfMark.startTask("WriteRunnable.runWrite"); - PerfMark.linkIn(link); Buffer buf = new Buffer(); - try { + try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runWrite")) { + PerfMark.linkIn(link); int writingControlFrames; synchronized (lock) { buf.write(buffer, buffer.completeSegmentByteCount()); @@ -146,13 +145,9 @@ final class AsyncSink implements Sink { synchronized (lock) { queuedControlFrames -= writingControlFrames; } - } finally { - PerfMark.stopTask("WriteRunnable.runWrite"); } } }); - } finally { - PerfMark.stopTask("AsyncSink.write"); } } @@ -161,8 +156,7 @@ final class AsyncSink implements Sink { if (closed) { throw new IOException("closed"); } - PerfMark.startTask("AsyncSink.flush"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("AsyncSink.flush")) { synchronized (lock) { if (flushEnqueued) { return; @@ -173,23 +167,18 @@ final class AsyncSink implements Sink { final Link link = PerfMark.linkOut(); @Override public void doRun() throws IOException { - PerfMark.startTask("WriteRunnable.runFlush"); - PerfMark.linkIn(link); Buffer buf = new Buffer(); - try { + try (TaskCloseable ignore = PerfMark.traceTask("WriteRunnable.runFlush")) { + PerfMark.linkIn(link); synchronized (lock) { buf.write(buffer, buffer.size()); flushEnqueued = false; } sink.write(buf, buf.size()); sink.flush(); - } finally { - PerfMark.stopTask("WriteRunnable.runFlush"); } } }); - } finally { - PerfMark.stopTask("AsyncSink.flush"); } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 46396b2a41..11cd177a84 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -35,6 +35,7 @@ import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.Header; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.util.List; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; @@ -139,55 +140,46 @@ class OkHttpClientStream extends AbstractClientStream { class Sink implements AbstractClientStream.Sink { @Override public void writeHeaders(Metadata metadata, byte[] payload) { - PerfMark.startTask("OkHttpClientStream$Sink.writeHeaders"); - String defaultPath = "/" + method.getFullMethodName(); - if (payload != null) { - useGet = true; - defaultPath += "?" + BaseEncoding.base64().encode(payload); - } - try { + try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeHeaders")) { + String defaultPath = "/" + method.getFullMethodName(); + if (payload != null) { + useGet = true; + defaultPath += "?" + BaseEncoding.base64().encode(payload); + } synchronized (state.lock) { state.streamReady(metadata, defaultPath); } - } finally { - PerfMark.stopTask("OkHttpClientStream$Sink.writeHeaders"); } } @Override public void writeFrame( WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { - PerfMark.startTask("OkHttpClientStream$Sink.writeFrame"); - Buffer buffer; - if (frame == null) { - buffer = EMPTY_BUFFER; - } else { - buffer = ((OkHttpWritableBuffer) frame).buffer(); - int size = (int) buffer.size(); - if (size > 0) { - onSendingBytes(size); + try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeFrame")) { + Buffer buffer; + if (frame == null) { + buffer = EMPTY_BUFFER; + } else { + buffer = ((OkHttpWritableBuffer) frame).buffer(); + int size = (int) buffer.size(); + if (size > 0) { + onSendingBytes(size); + } } - } - try { synchronized (state.lock) { state.sendBuffer(buffer, endOfStream, flush); getTransportTracer().reportMessageSent(numMessages); } - } finally { - PerfMark.stopTask("OkHttpClientStream$Sink.writeFrame"); } } @Override public void cancel(Status reason) { - PerfMark.startTask("OkHttpClientStream$Sink.cancel"); - try { + try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) { synchronized (state.lock) { state.cancel(reason, true, null); } - } finally { - PerfMark.stopTask("OkHttpClientStream$Sink.cancel"); } } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java index 1def5c17e0..85ed916095 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java @@ -28,6 +28,7 @@ import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.Header; import io.perfmark.PerfMark; import io.perfmark.Tag; +import io.perfmark.TaskCloseable; import java.util.List; import javax.annotation.concurrent.GuardedBy; import okio.Buffer; @@ -83,58 +84,49 @@ class OkHttpServerStream extends AbstractServerStream { class Sink implements AbstractServerStream.Sink { @Override public void writeHeaders(Metadata metadata) { - PerfMark.startTask("OkHttpServerStream$Sink.writeHeaders"); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("OkHttpServerStream$Sink.writeHeaders")) { List
responseHeaders = Headers.createResponseHeaders(metadata); synchronized (state.lock) { state.sendHeaders(responseHeaders); } - } finally { - PerfMark.stopTask("OkHttpServerStream$Sink.writeHeaders"); } } @Override public void writeFrame(WritableBuffer frame, boolean flush, int numMessages) { - PerfMark.startTask("OkHttpServerStream$Sink.writeFrame"); - Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); - int size = (int) buffer.size(); - if (size > 0) { - onSendingBytes(size); - } - - try { + try (TaskCloseable ignore = + PerfMark.traceTask("OkHttpServerStream$Sink.writeFrame")) { + Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); + int size = (int) buffer.size(); + if (size > 0) { + onSendingBytes(size); + } synchronized (state.lock) { state.sendBuffer(buffer, flush); transportTracer.reportMessageSent(numMessages); } - } finally { - PerfMark.stopTask("OkHttpServerStream$Sink.writeFrame"); } } @Override public void writeTrailers(Metadata trailers, boolean headersSent, Status status) { - PerfMark.startTask("OkHttpServerStream$Sink.writeTrailers"); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("OkHttpServerStream$Sink.writeTrailers")) { List
responseTrailers = Headers.createResponseTrailers(trailers, headersSent); synchronized (state.lock) { state.sendTrailers(responseTrailers); } - } finally { - PerfMark.stopTask("OkHttpServerStream$Sink.writeTrailers"); } } @Override public void cancel(Status reason) { - PerfMark.startTask("OkHttpServerStream$Sink.cancel"); - try { + try (TaskCloseable ignore = + PerfMark.traceTask("OkHttpServerStream$Sink.cancel")) { synchronized (state.lock) { state.cancel(ErrorCode.CANCEL, reason); } - } finally { - PerfMark.stopTask("OkHttpServerStream$Sink.cancel"); } } }