From 40854dc9e15675083a0968c967e0f85f49b66744 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Mon, 17 Jun 2019 15:25:39 -0700 Subject: [PATCH] core,netty: use PerfMark tags with the HTTP/2 stream ids This change removes the WriteQueue linking and splits it out into each of the commands, so that the trace is more precise, and the tag information is correct. It is still unclear what the initial Tag should be for ClientCallImpl, since it should not access the TransportState to get the HTTP/2 stream id. --- .../java/io/grpc/internal/ServerImpl.java | 2 +- .../java/io/grpc/internal/ServerImplTest.java | 3 + .../io/grpc/netty/NettyClientHandler.java | 170 ++++++++++++------ .../java/io/grpc/netty/NettyClientStream.java | 28 ++- .../io/grpc/netty/NettyClientTransport.java | 3 +- .../io/grpc/netty/NettyServerHandler.java | 131 +++++++++----- .../java/io/grpc/netty/NettyServerStream.java | 25 ++- .../io/grpc/netty/SendGrpcFrameCommand.java | 6 +- .../java/io/grpc/netty/StreamIdHolder.java | 4 + .../main/java/io/grpc/netty/WriteQueue.java | 8 +- .../io/grpc/netty/NettyClientHandlerTest.java | 8 +- .../io/grpc/netty/NettyClientStreamTest.java | 8 +- .../io/grpc/netty/NettyServerStreamTest.java | 2 +- 13 files changed, 279 insertions(+), 119 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 886fa0ff7e..f279d1169d 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -466,7 +466,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void streamCreated(ServerStream stream, String methodName, Metadata headers) { - Tag tag = PerfMark.createTag(methodName, stream.hashCode()); + Tag tag = PerfMark.createTag(methodName, stream.streamId()); PerfMark.startTask("ServerTransportListener.streamCreated", tag); try { streamCreatedInternal(stream, methodName, headers, tag); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ea6c43a264..dd6aa46ee3 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -483,10 +483,12 @@ public class ServerImplTest { transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders); + verify(stream).streamId(); verify(stream).close(statusCaptor.capture(), any(Metadata.class)); Status status = statusCaptor.getValue(); assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); assertEquals("Can't find decompressor for " + decompressorName, status.getDescription()); + verifyNoMoreInteractions(stream); } @@ -786,6 +788,7 @@ public class ServerImplTest { when(stream.statsTraceContext()).thenReturn(statsTraceCtx); transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); + verify(stream).streamId(); verify(stream).setListener(streamListenerCaptor.capture()); ServerStreamListener streamListener = streamListenerCaptor.getValue(); assertNotNull(streamListener); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 5cf74d8efd..6d5ea8c01d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -74,6 +74,8 @@ import io.netty.handler.codec.http2.Http2StreamVisitor; import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -355,6 +357,7 @@ class NettyClientHandler extends AbstractNettyHandler { // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here: if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) { NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); + PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag()); stream.transportHeadersReceived(headers, endStream); } @@ -369,6 +372,7 @@ class NettyClientHandler extends AbstractNettyHandler { private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { flowControlPing().onDataRead(data.readableBytes(), padding); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); + PerfMark.event("NettyClientHandler.onDataRead", stream.tag()); stream.transportDataReceived(data, endOfStream); if (keepAliveManager != null) { keepAliveManager.onDataReceived(); @@ -381,6 +385,7 @@ class NettyClientHandler extends AbstractNettyHandler { private void onRstStreamRead(int streamId, long errorCode) { NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); if (stream != null) { + PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag()); Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) .augmentDescription("Received Rst Stream"); stream.transportReportStatus( @@ -508,7 +513,7 @@ class NettyClientHandler extends AbstractNettyHandler { * Attempts to create a new stream from the given command. If there are too many active streams, * the creation request is queued. */ - private void createStream(final CreateStreamCommand command, final ChannelPromise promise) + private void createStream(CreateStreamCommand command, ChannelPromise promise) throws Exception { if (lifecycleManager.getShutdownThrowable() != null) { command.stream().setNonExistent(); @@ -521,7 +526,7 @@ class NettyClientHandler extends AbstractNettyHandler { } // Get the stream ID for the new stream. - final int streamId; + int streamId; try { streamId = incrementAndGetNextStreamId(); } catch (StatusException e) { @@ -539,54 +544,71 @@ class NettyClientHandler extends AbstractNettyHandler { return; } - final NettyClientStream.TransportState stream = command.stream(); - final Http2Headers headers = command.headers(); + NettyClientStream.TransportState stream = command.stream(); + Http2Headers headers = command.headers(); stream.setId(streamId); + PerfMark.startTask("NettyClientHandler.createStream", stream.tag()); + command.getLink().link(); + try { + createStreamTraced( + streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); + } finally { + PerfMark.stopTask("NettyClientHandler.createStream", stream.tag()); + } + } + + private void createStreamTraced( + final int streamId, + final NettyClientStream.TransportState stream, + final Http2Headers headers, + boolean isGet, + final boolean shouldBeCountedForInUse, + final ChannelPromise promise) { // Create an intermediate promise so that we can intercept the failure reported back to the // application. ChannelPromise tempPromise = ctx().newPromise(); - encoder().writeHeaders(ctx(), streamId, headers, 0, command.isGet(), tempPromise) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // The http2Stream will be null in case a stream buffered in the encoder - // was canceled via RST_STREAM. - Http2Stream http2Stream = connection().stream(streamId); - if (http2Stream != null) { - stream.getStatsTraceContext().clientOutboundHeaders(); - http2Stream.setProperty(streamKey, stream); + encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // The http2Stream will be null in case a stream buffered in the encoder + // was canceled via RST_STREAM. + Http2Stream http2Stream = connection().stream(streamId); + if (http2Stream != null) { + stream.getStatsTraceContext().clientOutboundHeaders(); + http2Stream.setProperty(streamKey, stream); - // This delays the in-use state until the I/O completes, which technically may - // be later than we would like. - if (command.shouldBeCountedForInUse()) { - inUseState.updateObjectInUse(http2Stream, true); - } - - // Attach the client stream to the HTTP/2 stream object as user data. - stream.setHttp2Stream(http2Stream); - } - // Otherwise, the stream has been cancelled and Netty is sending a - // RST_STREAM frame which causes it to purge pending writes from the - // flow-controller and delete the http2Stream. The stream listener has already - // been notified of cancellation so there is nothing to do. - - // Just forward on the success status to the original promise. - promise.setSuccess(); - } else { - final Throwable cause = future.cause(); - if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { - StreamBufferingEncoder.Http2GoAwayException e = - (StreamBufferingEncoder.Http2GoAwayException) cause; - lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData())); - promise.setFailure(lifecycleManager.getShutdownThrowable()); - } else { - promise.setFailure(cause); - } + // This delays the in-use state until the I/O completes, which technically may + // be later than we would like. + if (shouldBeCountedForInUse) { + inUseState.updateObjectInUse(http2Stream, true); } + + // Attach the client stream to the HTTP/2 stream object as user data. + stream.setHttp2Stream(http2Stream); } - }); + // Otherwise, the stream has been cancelled and Netty is sending a + // RST_STREAM frame which causes it to purge pending writes from the + // flow-controller and delete the http2Stream. The stream listener has already + // been notified of cancellation so there is nothing to do. + + // Just forward on the success status to the original promise. + promise.setSuccess(); + } else { + final Throwable cause = future.cause(); + if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { + StreamBufferingEncoder.Http2GoAwayException e = + (StreamBufferingEncoder.Http2GoAwayException) cause; + lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData())); + promise.setFailure(lifecycleManager.getShutdownThrowable()); + } else { + promise.setFailure(cause); + } + } + } + }); } /** @@ -595,14 +617,20 @@ class NettyClientHandler extends AbstractNettyHandler { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - Status reason = cmd.reason(); - if (reason != null) { - stream.transportReportStatus(reason, true, new Metadata()); - } - if (!cmd.stream().isNonExistent()) { - encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); - } else { - promise.setSuccess(); + PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag()); + cmd.getLink().link(); + try { + Status reason = cmd.reason(); + if (reason != null) { + stream.transportReportStatus(reason, true, new Metadata()); + } + if (!cmd.stream().isNonExistent()) { + encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); + } else { + promise.setSuccess(); + } + } finally { + PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag()); } } @@ -611,16 +639,33 @@ class NettyClientHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) { - // Call the base class to write the HTTP/2 DATA frame. - // Note: no need to flush since this is handled by the outbound flow controller. - encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); + PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + cmd.getLink().link(); + try { + // Call the base class to write the HTTP/2 DATA frame. + // Note: no need to flush since this is handled by the outbound flow controller. + encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); + } finally { + PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + } + } + + private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, + ChannelPromise promise) { + PerfMark.startTask("NettyClientHandler.sendPingFrame"); + msg.getLink().link(); + try { + sendPingFrameTraced(ctx, msg, promise); + } finally { + PerfMark.stopTask("NettyClientHandler.sendPingFrame"); + } } /** * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is * registered to be called when the existing operation completes, and no new frame is sent. */ - private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, + private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise) { // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown // but before termination. After termination, messages will no longer arrive because the @@ -690,12 +735,19 @@ class NettyClientHandler extends AbstractNettyHandler { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); - if (clientStream != null) { - clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); - resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); + PerfMark.startTask("NettyClientHandler.forcefulClose", tag); + msg.getLink().link(); + try { + if (clientStream != null) { + clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); + resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + } + stream.close(); + return true; + } finally { + PerfMark.stopTask("NettyClientHandler.forcefulClose", tag); } - stream.close(); - return true; } }); promise.setSuccess(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0949faa981..14100c564c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -43,7 +43,9 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; +import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.Tag; import javax.annotation.Nullable; /** @@ -215,9 +217,20 @@ class NettyClientStream extends AbstractClientStream { transportState().requestMessagesFromDeframer(numMessages); } else { channel.eventLoop().execute(new Runnable() { + final Link link = PerfMark.link(); @Override public void run() { - transportState().requestMessagesFromDeframer(numMessages); + PerfMark.startTask( + "NettyClientStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + link.link(); + try { + transportState().requestMessagesFromDeframer(numMessages); + } finally { + PerfMark.stopTask( + "NettyClientStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + } } }); } @@ -249,20 +262,25 @@ class NettyClientStream extends AbstractClientStream { implements StreamIdHolder { private static final int NON_EXISTENT_ID = -1; + private final String methodName; private final NettyClientHandler handler; private final EventLoop eventLoop; private int id; private Http2Stream http2Stream; + private Tag tag; public TransportState( NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + String methodName) { super(maxMessageSize, statsTraceCtx, transportTracer); + this.methodName = checkNotNull(methodName, "methodName"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = checkNotNull(eventLoop, "eventLoop"); + tag = PerfMark.createTag(methodName); } @Override @@ -275,6 +293,7 @@ class NettyClientStream extends AbstractClientStream { checkArgument(id > 0, "id must be positive %s", id); checkState(this.id == 0, "id has been previously set: %s", this.id); this.id = id; + this.tag = PerfMark.createTag(methodName, id); } /** @@ -359,5 +378,10 @@ class NettyClientStream extends AbstractClientStream { void transportDataReceived(ByteBuf frame, boolean endOfStream) { transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); } + + @Override + public final Tag tag() { + return tag; + } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index de01f7d361..2577422281 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -176,7 +176,8 @@ class NettyClientTransport implements ConnectionClientTransport { channel.eventLoop(), maxMessageSize, statsTraceCtx, - transportTracer) { + transportTracer, + method.getFullMethodName()) { @Override protected Status statusFromFailedFuture(ChannelFuture f) { return NettyClientTransport.this.statusFromFailedFuture(f); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 41a630eac4..026f4e3844 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -83,6 +83,8 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; @@ -427,18 +429,25 @@ class NettyServerHandler extends AbstractNettyHandler { http2Stream, maxMessageSize, statsTraceCtx, - transportTracer); - String authority = getOrUpdateAuthority((AsciiString) headers.authority()); - NettyServerStream stream = new NettyServerStream( - ctx.channel(), - state, - attributes, - authority, - statsTraceCtx, - transportTracer); - transportListener.streamCreated(stream, method, metadata); - state.onStreamAllocated(); - http2Stream.setProperty(streamKey, state); + transportTracer, + method); + + PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag()); + try { + String authority = getOrUpdateAuthority((AsciiString) headers.authority()); + NettyServerStream stream = new NettyServerStream( + ctx.channel(), + state, + attributes, + authority, + statsTraceCtx, + transportTracer); + transportListener.streamCreated(stream, method, metadata); + state.onStreamAllocated(); + http2Stream.setProperty(streamKey, state); + } finally { + PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag()); + } } catch (Exception e) { logger.log(Level.WARNING, "Exception in onHeadersRead()", e); // Throw an exception that will get handled by onStreamError. @@ -463,7 +472,12 @@ class NettyServerHandler extends AbstractNettyHandler { flowControlPing().onDataRead(data.readableBytes(), padding); try { NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); - stream.inboundDataReceived(data, endOfStream); + PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag()); + try { + stream.inboundDataReceived(data, endOfStream); + } finally { + PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag()); + } } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()", e); // Throw an exception that will get handled by onStreamError. @@ -475,8 +489,13 @@ class NettyServerHandler extends AbstractNettyHandler { try { NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); if (stream != null) { - stream.transportReportStatus( - Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); + PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag()); + try { + stream.transportReportStatus( + Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); + } finally { + PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag()); + } } } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); @@ -499,12 +518,18 @@ class NettyServerHandler extends AbstractNettyHandler { logger.log(Level.WARNING, "Stream Error", cause); NettyServerStream.TransportState serverStream = serverStream( connection().stream(Http2Exception.streamId(http2Ex))); - if (serverStream != null) { - serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); + Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag(); + PerfMark.startTask("NettyServerHandler.onStreamError", tag); + try { + if (serverStream != null) { + serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); + } + // TODO(ejona): Abort the stream by sending headers to help the client with debugging. + // Delegate to the base class to send a RST_STREAM. + super.onStreamError(ctx, outbound, cause, http2Ex); + } finally { + PerfMark.stopTask("NettyServerHandler.onStreamError", tag); } - // TODO(ejona): Abort the stream by sending headers to help the client with debugging. - // Delegate to the base class to send a RST_STREAM. - super.onStreamError(ctx, outbound, cause, http2Ex); } @Override @@ -623,11 +648,17 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) throws Http2Exception { - if (cmd.endStream()) { - closeStreamWhenDone(promise, cmd.streamId()); + PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); + cmd.getLink().link(); + try { + if (cmd.endStream()) { + closeStreamWhenDone(promise, cmd.stream().id()); + } + // Call the base class to write the HTTP/2 DATA frame. + encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); } - // Call the base class to write the HTTP/2 DATA frame. - encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); } /** @@ -635,26 +666,38 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise) throws Http2Exception { - // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is - // fixed. - int streamId = cmd.stream().id(); - Http2Stream stream = connection().stream(streamId); - if (stream == null) { - resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); - return; + PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); + cmd.getLink().link(); + try { + // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 + // is fixed. + int streamId = cmd.stream().id(); + Http2Stream stream = connection().stream(streamId); + if (stream == null) { + resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); + return; + } + if (cmd.endOfStream()) { + closeStreamWhenDone(promise, streamId); + } + encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); } - if (cmd.endOfStream()) { - closeStreamWhenDone(promise, streamId); - } - encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); } private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise) { - // Notify the listener if we haven't already. - cmd.stream().transportReportStatus(cmd.reason()); - // Terminate the stream. - encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); + PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag()); + cmd.getLink().link(); + try { + // Notify the listener if we haven't already. + cmd.stream().transportReportStatus(cmd.reason()); + // Terminate the stream. + encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag()); + } } private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, @@ -665,8 +708,14 @@ class NettyServerHandler extends AbstractNettyHandler { public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { - serverStream.transportReportStatus(msg.getStatus()); - resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag()); + msg.getLink().link(); + try { + serverStream.transportReportStatus(msg.getStatus()); + resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + } finally { + PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag()); + } } stream.close(); return true; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 64ab6a3bda..5f9670172b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -33,7 +33,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.util.logging.Level; import java.util.logging.Logger; @@ -98,10 +100,21 @@ class NettyServerStream extends AbstractServerStream { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); } else { + final Link link = PerfMark.link(); channel.eventLoop().execute(new Runnable() { @Override public void run() { - transportState().requestMessagesFromDeframer(numMessages); + PerfMark.startTask( + "NettyServerStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + link.link(); + try { + transportState().requestMessagesFromDeframer(numMessages); + } finally { + PerfMark.stopTask( + "NettyServerStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + } } }); } @@ -195,6 +208,7 @@ class NettyServerStream extends AbstractServerStream { private final Http2Stream http2Stream; private final NettyServerHandler handler; private final EventLoop eventLoop; + private final Tag tag; public TransportState( NettyServerHandler handler, @@ -202,11 +216,13 @@ class NettyServerStream extends AbstractServerStream { Http2Stream http2Stream, int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + String methodName) { super(maxMessageSize, statsTraceCtx, transportTracer); this.http2Stream = checkNotNull(http2Stream, "http2Stream"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = eventLoop; + this.tag = PerfMark.createTag(methodName, http2Stream.id()); } @Override @@ -240,6 +256,11 @@ class NettyServerStream extends AbstractServerStream { public int id() { return http2Stream.id(); } + + @Override + public Tag tag() { + return tag; + } } @Override diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 4a343e8d6e..5080584218 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -46,8 +46,8 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu return link; } - int streamId() { - return stream.id(); + StreamIdHolder stream() { + return stream; } boolean endStream() { @@ -100,7 +100,7 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu @Override public String toString() { - return getClass().getSimpleName() + "(streamId=" + streamId() + return getClass().getSimpleName() + "(streamId=" + stream.id() + ", endStream=" + endStream + ", content=" + content() + ")"; } diff --git a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java index 80e0449c49..65203c721d 100644 --- a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java +++ b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java @@ -16,10 +16,14 @@ package io.grpc.netty; +import io.perfmark.Tag; + /** Container for stream ids. */ interface StreamIdHolder { /** * Returns the id. */ int id(); + + Tag tag(); } diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index f3ef7ce0e1..89c0990cb5 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -112,13 +112,7 @@ class WriteQueue { int i = 0; boolean flushedOnce = false; while ((cmd = queue.poll()) != null) { - PerfMark.startTask("WriteQueue.run"); - try { - cmd.getLink().link(); - cmd.run(channel); - } finally { - PerfMark.stopTask("WriteQueue.run"); - } + cmd.run(channel); if (++i == DEQUE_CHUNK_SIZE) { i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 6cdf1caf4e..e771a93eb9 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -789,7 +789,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase