diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 886fa0ff7e..f279d1169d 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -466,7 +466,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume @Override public void streamCreated(ServerStream stream, String methodName, Metadata headers) { - Tag tag = PerfMark.createTag(methodName, stream.hashCode()); + Tag tag = PerfMark.createTag(methodName, stream.streamId()); PerfMark.startTask("ServerTransportListener.streamCreated", tag); try { streamCreatedInternal(stream, methodName, headers, tag); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ea6c43a264..dd6aa46ee3 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -483,10 +483,12 @@ public class ServerImplTest { transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders); + verify(stream).streamId(); verify(stream).close(statusCaptor.capture(), any(Metadata.class)); Status status = statusCaptor.getValue(); assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); assertEquals("Can't find decompressor for " + decompressorName, status.getDescription()); + verifyNoMoreInteractions(stream); } @@ -786,6 +788,7 @@ public class ServerImplTest { when(stream.statsTraceContext()).thenReturn(statsTraceCtx); transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); + verify(stream).streamId(); verify(stream).setListener(streamListenerCaptor.capture()); ServerStreamListener streamListener = streamListenerCaptor.getValue(); assertNotNull(streamListener); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 5cf74d8efd..6d5ea8c01d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -74,6 +74,8 @@ import io.netty.handler.codec.http2.Http2StreamVisitor; import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -355,6 +357,7 @@ class NettyClientHandler extends AbstractNettyHandler { // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here: if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) { NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); + PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag()); stream.transportHeadersReceived(headers, endStream); } @@ -369,6 +372,7 @@ class NettyClientHandler extends AbstractNettyHandler { private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { flowControlPing().onDataRead(data.readableBytes(), padding); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); + PerfMark.event("NettyClientHandler.onDataRead", stream.tag()); stream.transportDataReceived(data, endOfStream); if (keepAliveManager != null) { keepAliveManager.onDataReceived(); @@ -381,6 +385,7 @@ class NettyClientHandler extends AbstractNettyHandler { private void onRstStreamRead(int streamId, long errorCode) { NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); if (stream != null) { + PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag()); Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) .augmentDescription("Received Rst Stream"); stream.transportReportStatus( @@ -508,7 +513,7 @@ class NettyClientHandler extends AbstractNettyHandler { * Attempts to create a new stream from the given command. If there are too many active streams, * the creation request is queued. */ - private void createStream(final CreateStreamCommand command, final ChannelPromise promise) + private void createStream(CreateStreamCommand command, ChannelPromise promise) throws Exception { if (lifecycleManager.getShutdownThrowable() != null) { command.stream().setNonExistent(); @@ -521,7 +526,7 @@ class NettyClientHandler extends AbstractNettyHandler { } // Get the stream ID for the new stream. - final int streamId; + int streamId; try { streamId = incrementAndGetNextStreamId(); } catch (StatusException e) { @@ -539,54 +544,71 @@ class NettyClientHandler extends AbstractNettyHandler { return; } - final NettyClientStream.TransportState stream = command.stream(); - final Http2Headers headers = command.headers(); + NettyClientStream.TransportState stream = command.stream(); + Http2Headers headers = command.headers(); stream.setId(streamId); + PerfMark.startTask("NettyClientHandler.createStream", stream.tag()); + command.getLink().link(); + try { + createStreamTraced( + streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); + } finally { + PerfMark.stopTask("NettyClientHandler.createStream", stream.tag()); + } + } + + private void createStreamTraced( + final int streamId, + final NettyClientStream.TransportState stream, + final Http2Headers headers, + boolean isGet, + final boolean shouldBeCountedForInUse, + final ChannelPromise promise) { // Create an intermediate promise so that we can intercept the failure reported back to the // application. ChannelPromise tempPromise = ctx().newPromise(); - encoder().writeHeaders(ctx(), streamId, headers, 0, command.isGet(), tempPromise) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - // The http2Stream will be null in case a stream buffered in the encoder - // was canceled via RST_STREAM. - Http2Stream http2Stream = connection().stream(streamId); - if (http2Stream != null) { - stream.getStatsTraceContext().clientOutboundHeaders(); - http2Stream.setProperty(streamKey, stream); + encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + // The http2Stream will be null in case a stream buffered in the encoder + // was canceled via RST_STREAM. + Http2Stream http2Stream = connection().stream(streamId); + if (http2Stream != null) { + stream.getStatsTraceContext().clientOutboundHeaders(); + http2Stream.setProperty(streamKey, stream); - // This delays the in-use state until the I/O completes, which technically may - // be later than we would like. - if (command.shouldBeCountedForInUse()) { - inUseState.updateObjectInUse(http2Stream, true); - } - - // Attach the client stream to the HTTP/2 stream object as user data. - stream.setHttp2Stream(http2Stream); - } - // Otherwise, the stream has been cancelled and Netty is sending a - // RST_STREAM frame which causes it to purge pending writes from the - // flow-controller and delete the http2Stream. The stream listener has already - // been notified of cancellation so there is nothing to do. - - // Just forward on the success status to the original promise. - promise.setSuccess(); - } else { - final Throwable cause = future.cause(); - if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { - StreamBufferingEncoder.Http2GoAwayException e = - (StreamBufferingEncoder.Http2GoAwayException) cause; - lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData())); - promise.setFailure(lifecycleManager.getShutdownThrowable()); - } else { - promise.setFailure(cause); - } + // This delays the in-use state until the I/O completes, which technically may + // be later than we would like. + if (shouldBeCountedForInUse) { + inUseState.updateObjectInUse(http2Stream, true); } + + // Attach the client stream to the HTTP/2 stream object as user data. + stream.setHttp2Stream(http2Stream); } - }); + // Otherwise, the stream has been cancelled and Netty is sending a + // RST_STREAM frame which causes it to purge pending writes from the + // flow-controller and delete the http2Stream. The stream listener has already + // been notified of cancellation so there is nothing to do. + + // Just forward on the success status to the original promise. + promise.setSuccess(); + } else { + final Throwable cause = future.cause(); + if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { + StreamBufferingEncoder.Http2GoAwayException e = + (StreamBufferingEncoder.Http2GoAwayException) cause; + lifecycleManager.notifyShutdown(statusFromGoAway(e.errorCode(), e.debugData())); + promise.setFailure(lifecycleManager.getShutdownThrowable()); + } else { + promise.setFailure(cause); + } + } + } + }); } /** @@ -595,14 +617,20 @@ class NettyClientHandler extends AbstractNettyHandler { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - Status reason = cmd.reason(); - if (reason != null) { - stream.transportReportStatus(reason, true, new Metadata()); - } - if (!cmd.stream().isNonExistent()) { - encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); - } else { - promise.setSuccess(); + PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag()); + cmd.getLink().link(); + try { + Status reason = cmd.reason(); + if (reason != null) { + stream.transportReportStatus(reason, true, new Metadata()); + } + if (!cmd.stream().isNonExistent()) { + encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); + } else { + promise.setSuccess(); + } + } finally { + PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag()); } } @@ -611,16 +639,33 @@ class NettyClientHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) { - // Call the base class to write the HTTP/2 DATA frame. - // Note: no need to flush since this is handled by the outbound flow controller. - encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); + PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + cmd.getLink().link(); + try { + // Call the base class to write the HTTP/2 DATA frame. + // Note: no need to flush since this is handled by the outbound flow controller. + encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); + } finally { + PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag()); + } + } + + private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, + ChannelPromise promise) { + PerfMark.startTask("NettyClientHandler.sendPingFrame"); + msg.getLink().link(); + try { + sendPingFrameTraced(ctx, msg, promise); + } finally { + PerfMark.stopTask("NettyClientHandler.sendPingFrame"); + } } /** * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is * registered to be called when the existing operation completes, and no new frame is sent. */ - private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, + private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise) { // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown // but before termination. After termination, messages will no longer arrive because the @@ -690,12 +735,19 @@ class NettyClientHandler extends AbstractNettyHandler { @Override public boolean visit(Http2Stream stream) throws Http2Exception { NettyClientStream.TransportState clientStream = clientStream(stream); - if (clientStream != null) { - clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); - resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); + PerfMark.startTask("NettyClientHandler.forcefulClose", tag); + msg.getLink().link(); + try { + if (clientStream != null) { + clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); + resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + } + stream.close(); + return true; + } finally { + PerfMark.stopTask("NettyClientHandler.forcefulClose", tag); } - stream.close(); - return true; } }); promise.setSuccess(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0949faa981..14100c564c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -43,7 +43,9 @@ import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.AsciiString; +import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.Tag; import javax.annotation.Nullable; /** @@ -215,9 +217,20 @@ class NettyClientStream extends AbstractClientStream { transportState().requestMessagesFromDeframer(numMessages); } else { channel.eventLoop().execute(new Runnable() { + final Link link = PerfMark.link(); @Override public void run() { - transportState().requestMessagesFromDeframer(numMessages); + PerfMark.startTask( + "NettyClientStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + link.link(); + try { + transportState().requestMessagesFromDeframer(numMessages); + } finally { + PerfMark.stopTask( + "NettyClientStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + } } }); } @@ -249,20 +262,25 @@ class NettyClientStream extends AbstractClientStream { implements StreamIdHolder { private static final int NON_EXISTENT_ID = -1; + private final String methodName; private final NettyClientHandler handler; private final EventLoop eventLoop; private int id; private Http2Stream http2Stream; + private Tag tag; public TransportState( NettyClientHandler handler, EventLoop eventLoop, int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + String methodName) { super(maxMessageSize, statsTraceCtx, transportTracer); + this.methodName = checkNotNull(methodName, "methodName"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = checkNotNull(eventLoop, "eventLoop"); + tag = PerfMark.createTag(methodName); } @Override @@ -275,6 +293,7 @@ class NettyClientStream extends AbstractClientStream { checkArgument(id > 0, "id must be positive %s", id); checkState(this.id == 0, "id has been previously set: %s", this.id); this.id = id; + this.tag = PerfMark.createTag(methodName, id); } /** @@ -359,5 +378,10 @@ class NettyClientStream extends AbstractClientStream { void transportDataReceived(ByteBuf frame, boolean endOfStream) { transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); } + + @Override + public final Tag tag() { + return tag; + } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index de01f7d361..2577422281 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -176,7 +176,8 @@ class NettyClientTransport implements ConnectionClientTransport { channel.eventLoop(), maxMessageSize, statsTraceCtx, - transportTracer) { + transportTracer, + method.getFullMethodName()) { @Override protected Status statusFromFailedFuture(ChannelFuture f) { return NettyClientTransport.this.statusFromFailedFuture(f); diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 41a630eac4..026f4e3844 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -83,6 +83,8 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.logging.LogLevel; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; +import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; @@ -427,18 +429,25 @@ class NettyServerHandler extends AbstractNettyHandler { http2Stream, maxMessageSize, statsTraceCtx, - transportTracer); - String authority = getOrUpdateAuthority((AsciiString) headers.authority()); - NettyServerStream stream = new NettyServerStream( - ctx.channel(), - state, - attributes, - authority, - statsTraceCtx, - transportTracer); - transportListener.streamCreated(stream, method, metadata); - state.onStreamAllocated(); - http2Stream.setProperty(streamKey, state); + transportTracer, + method); + + PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag()); + try { + String authority = getOrUpdateAuthority((AsciiString) headers.authority()); + NettyServerStream stream = new NettyServerStream( + ctx.channel(), + state, + attributes, + authority, + statsTraceCtx, + transportTracer); + transportListener.streamCreated(stream, method, metadata); + state.onStreamAllocated(); + http2Stream.setProperty(streamKey, state); + } finally { + PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag()); + } } catch (Exception e) { logger.log(Level.WARNING, "Exception in onHeadersRead()", e); // Throw an exception that will get handled by onStreamError. @@ -463,7 +472,12 @@ class NettyServerHandler extends AbstractNettyHandler { flowControlPing().onDataRead(data.readableBytes(), padding); try { NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); - stream.inboundDataReceived(data, endOfStream); + PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag()); + try { + stream.inboundDataReceived(data, endOfStream); + } finally { + PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag()); + } } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onDataRead()", e); // Throw an exception that will get handled by onStreamError. @@ -475,8 +489,13 @@ class NettyServerHandler extends AbstractNettyHandler { try { NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); if (stream != null) { - stream.transportReportStatus( - Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); + PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag()); + try { + stream.transportReportStatus( + Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); + } finally { + PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag()); + } } } catch (Throwable e) { logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); @@ -499,12 +518,18 @@ class NettyServerHandler extends AbstractNettyHandler { logger.log(Level.WARNING, "Stream Error", cause); NettyServerStream.TransportState serverStream = serverStream( connection().stream(Http2Exception.streamId(http2Ex))); - if (serverStream != null) { - serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); + Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag(); + PerfMark.startTask("NettyServerHandler.onStreamError", tag); + try { + if (serverStream != null) { + serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); + } + // TODO(ejona): Abort the stream by sending headers to help the client with debugging. + // Delegate to the base class to send a RST_STREAM. + super.onStreamError(ctx, outbound, cause, http2Ex); + } finally { + PerfMark.stopTask("NettyServerHandler.onStreamError", tag); } - // TODO(ejona): Abort the stream by sending headers to help the client with debugging. - // Delegate to the base class to send a RST_STREAM. - super.onStreamError(ctx, outbound, cause, http2Ex); } @Override @@ -623,11 +648,17 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise) throws Http2Exception { - if (cmd.endStream()) { - closeStreamWhenDone(promise, cmd.streamId()); + PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); + cmd.getLink().link(); + try { + if (cmd.endStream()) { + closeStreamWhenDone(promise, cmd.stream().id()); + } + // Call the base class to write the HTTP/2 DATA frame. + encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag()); } - // Call the base class to write the HTTP/2 DATA frame. - encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); } /** @@ -635,26 +666,38 @@ class NettyServerHandler extends AbstractNettyHandler { */ private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise) throws Http2Exception { - // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is - // fixed. - int streamId = cmd.stream().id(); - Http2Stream stream = connection().stream(streamId); - if (stream == null) { - resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); - return; + PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); + cmd.getLink().link(); + try { + // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 + // is fixed. + int streamId = cmd.stream().id(); + Http2Stream stream = connection().stream(streamId); + if (stream == null) { + resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); + return; + } + if (cmd.endOfStream()) { + closeStreamWhenDone(promise, streamId); + } + encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag()); } - if (cmd.endOfStream()) { - closeStreamWhenDone(promise, streamId); - } - encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); } private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise) { - // Notify the listener if we haven't already. - cmd.stream().transportReportStatus(cmd.reason()); - // Terminate the stream. - encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); + PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag()); + cmd.getLink().link(); + try { + // Notify the listener if we haven't already. + cmd.stream().transportReportStatus(cmd.reason()); + // Terminate the stream. + encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); + } finally { + PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag()); + } } private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, @@ -665,8 +708,14 @@ class NettyServerHandler extends AbstractNettyHandler { public boolean visit(Http2Stream stream) throws Http2Exception { NettyServerStream.TransportState serverStream = serverStream(stream); if (serverStream != null) { - serverStream.transportReportStatus(msg.getStatus()); - resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag()); + msg.getLink().link(); + try { + serverStream.transportReportStatus(msg.getStatus()); + resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); + } finally { + PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag()); + } } stream.close(); return true; diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 64ab6a3bda..5f9670172b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -33,7 +33,9 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Stream; +import io.perfmark.Link; import io.perfmark.PerfMark; +import io.perfmark.Tag; import java.util.logging.Level; import java.util.logging.Logger; @@ -98,10 +100,21 @@ class NettyServerStream extends AbstractServerStream { // Processing data read in the event loop so can call into the deframer immediately transportState().requestMessagesFromDeframer(numMessages); } else { + final Link link = PerfMark.link(); channel.eventLoop().execute(new Runnable() { @Override public void run() { - transportState().requestMessagesFromDeframer(numMessages); + PerfMark.startTask( + "NettyServerStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + link.link(); + try { + transportState().requestMessagesFromDeframer(numMessages); + } finally { + PerfMark.stopTask( + "NettyServerStream$Sink.requestMessagesFromDeframer", + transportState().tag()); + } } }); } @@ -195,6 +208,7 @@ class NettyServerStream extends AbstractServerStream { private final Http2Stream http2Stream; private final NettyServerHandler handler; private final EventLoop eventLoop; + private final Tag tag; public TransportState( NettyServerHandler handler, @@ -202,11 +216,13 @@ class NettyServerStream extends AbstractServerStream { Http2Stream http2Stream, int maxMessageSize, StatsTraceContext statsTraceCtx, - TransportTracer transportTracer) { + TransportTracer transportTracer, + String methodName) { super(maxMessageSize, statsTraceCtx, transportTracer); this.http2Stream = checkNotNull(http2Stream, "http2Stream"); this.handler = checkNotNull(handler, "handler"); this.eventLoop = eventLoop; + this.tag = PerfMark.createTag(methodName, http2Stream.id()); } @Override @@ -240,6 +256,11 @@ class NettyServerStream extends AbstractServerStream { public int id() { return http2Stream.id(); } + + @Override + public Tag tag() { + return tag; + } } @Override diff --git a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java index 4a343e8d6e..5080584218 100644 --- a/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java +++ b/netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java @@ -46,8 +46,8 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu return link; } - int streamId() { - return stream.id(); + StreamIdHolder stream() { + return stream; } boolean endStream() { @@ -100,7 +100,7 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu @Override public String toString() { - return getClass().getSimpleName() + "(streamId=" + streamId() + return getClass().getSimpleName() + "(streamId=" + stream.id() + ", endStream=" + endStream + ", content=" + content() + ")"; } diff --git a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java index 80e0449c49..65203c721d 100644 --- a/netty/src/main/java/io/grpc/netty/StreamIdHolder.java +++ b/netty/src/main/java/io/grpc/netty/StreamIdHolder.java @@ -16,10 +16,14 @@ package io.grpc.netty; +import io.perfmark.Tag; + /** Container for stream ids. */ interface StreamIdHolder { /** * Returns the id. */ int id(); + + Tag tag(); } diff --git a/netty/src/main/java/io/grpc/netty/WriteQueue.java b/netty/src/main/java/io/grpc/netty/WriteQueue.java index f3ef7ce0e1..89c0990cb5 100644 --- a/netty/src/main/java/io/grpc/netty/WriteQueue.java +++ b/netty/src/main/java/io/grpc/netty/WriteQueue.java @@ -112,13 +112,7 @@ class WriteQueue { int i = 0; boolean flushedOnce = false; while ((cmd = queue.poll()) != null) { - PerfMark.startTask("WriteQueue.run"); - try { - cmd.getLink().link(); - cmd.run(channel); - } finally { - PerfMark.stopTask("WriteQueue.run"); - } + cmd.run(channel); if (++i == DEQUE_CHUNK_SIZE) { i = 0; // Flush each chunk so we are releasing buffers periodically. In theory this loop diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index 6cdf1caf4e..e771a93eb9 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -789,7 +789,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase