core,netty: use PerfMark tags with the HTTP/2 stream ids

This change removes the WriteQueue linking and splits it out into each
of the commands, so that the trace is more precise, and the tag
information is correct.

It is still unclear what the initial Tag should be for ClientCallImpl,
since it should not access the TransportState to get the HTTP/2 stream id.
This commit is contained in:
Carl Mastrangelo 2019-06-17 15:25:39 -07:00 committed by GitHub
parent 3829574c1c
commit 40854dc9e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 279 additions and 119 deletions

View File

@ -466,7 +466,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@Override @Override
public void streamCreated(ServerStream stream, String methodName, Metadata headers) { public void streamCreated(ServerStream stream, String methodName, Metadata headers) {
Tag tag = PerfMark.createTag(methodName, stream.hashCode()); Tag tag = PerfMark.createTag(methodName, stream.streamId());
PerfMark.startTask("ServerTransportListener.streamCreated", tag); PerfMark.startTask("ServerTransportListener.streamCreated", tag);
try { try {
streamCreatedInternal(stream, methodName, headers, tag); streamCreatedInternal(stream, methodName, headers, tag);

View File

@ -483,10 +483,12 @@ public class ServerImplTest {
transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders); transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
verify(stream).streamId();
verify(stream).close(statusCaptor.capture(), any(Metadata.class)); verify(stream).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue(); Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNIMPLEMENTED, status.getCode()); assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
assertEquals("Can't find decompressor for " + decompressorName, status.getDescription()); assertEquals("Can't find decompressor for " + decompressorName, status.getDescription());
verifyNoMoreInteractions(stream); verifyNoMoreInteractions(stream);
} }
@ -786,6 +788,7 @@ public class ServerImplTest {
when(stream.statsTraceContext()).thenReturn(statsTraceCtx); when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
transportListener.streamCreated(stream, "Waiter/serve", requestHeaders); transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
verify(stream).streamId();
verify(stream).setListener(streamListenerCaptor.capture()); verify(stream).setListener(streamListenerCaptor.capture());
ServerStreamListener streamListener = streamListenerCaptor.getValue(); ServerStreamListener streamListener = streamListenerCaptor.getValue();
assertNotNull(streamListener); assertNotNull(streamListener);

View File

@ -74,6 +74,8 @@ import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.handler.codec.http2.StreamBufferingEncoder;
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.logging.Level; import java.util.logging.Level;
@ -355,6 +357,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// Stream 1 is reserved for the Upgrade response, so we should ignore its headers here: // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) { if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
stream.transportHeadersReceived(headers, endStream); stream.transportHeadersReceived(headers, endStream);
} }
@ -369,6 +372,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
flowControlPing().onDataRead(data.readableBytes(), padding); flowControlPing().onDataRead(data.readableBytes(), padding);
NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
stream.transportDataReceived(data, endOfStream); stream.transportDataReceived(data, endOfStream);
if (keepAliveManager != null) { if (keepAliveManager != null) {
keepAliveManager.onDataReceived(); keepAliveManager.onDataReceived();
@ -381,6 +385,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private void onRstStreamRead(int streamId, long errorCode) { private void onRstStreamRead(int streamId, long errorCode) {
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) { if (stream != null) {
PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode) Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Rst Stream"); .augmentDescription("Received Rst Stream");
stream.transportReportStatus( stream.transportReportStatus(
@ -508,7 +513,7 @@ class NettyClientHandler extends AbstractNettyHandler {
* Attempts to create a new stream from the given command. If there are too many active streams, * Attempts to create a new stream from the given command. If there are too many active streams,
* the creation request is queued. * the creation request is queued.
*/ */
private void createStream(final CreateStreamCommand command, final ChannelPromise promise) private void createStream(CreateStreamCommand command, ChannelPromise promise)
throws Exception { throws Exception {
if (lifecycleManager.getShutdownThrowable() != null) { if (lifecycleManager.getShutdownThrowable() != null) {
command.stream().setNonExistent(); command.stream().setNonExistent();
@ -521,7 +526,7 @@ class NettyClientHandler extends AbstractNettyHandler {
} }
// Get the stream ID for the new stream. // Get the stream ID for the new stream.
final int streamId; int streamId;
try { try {
streamId = incrementAndGetNextStreamId(); streamId = incrementAndGetNextStreamId();
} catch (StatusException e) { } catch (StatusException e) {
@ -539,14 +544,31 @@ class NettyClientHandler extends AbstractNettyHandler {
return; return;
} }
final NettyClientStream.TransportState stream = command.stream(); NettyClientStream.TransportState stream = command.stream();
final Http2Headers headers = command.headers(); Http2Headers headers = command.headers();
stream.setId(streamId); stream.setId(streamId);
PerfMark.startTask("NettyClientHandler.createStream", stream.tag());
command.getLink().link();
try {
createStreamTraced(
streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
} finally {
PerfMark.stopTask("NettyClientHandler.createStream", stream.tag());
}
}
private void createStreamTraced(
final int streamId,
final NettyClientStream.TransportState stream,
final Http2Headers headers,
boolean isGet,
final boolean shouldBeCountedForInUse,
final ChannelPromise promise) {
// Create an intermediate promise so that we can intercept the failure reported back to the // Create an intermediate promise so that we can intercept the failure reported back to the
// application. // application.
ChannelPromise tempPromise = ctx().newPromise(); ChannelPromise tempPromise = ctx().newPromise();
encoder().writeHeaders(ctx(), streamId, headers, 0, command.isGet(), tempPromise) encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
.addListener(new ChannelFutureListener() { .addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
@ -560,7 +582,7 @@ class NettyClientHandler extends AbstractNettyHandler {
// This delays the in-use state until the I/O completes, which technically may // This delays the in-use state until the I/O completes, which technically may
// be later than we would like. // be later than we would like.
if (command.shouldBeCountedForInUse()) { if (shouldBeCountedForInUse) {
inUseState.updateObjectInUse(http2Stream, true); inUseState.updateObjectInUse(http2Stream, true);
} }
@ -595,6 +617,9 @@ class NettyClientHandler extends AbstractNettyHandler {
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream(); NettyClientStream.TransportState stream = cmd.stream();
PerfMark.startTask("NettyClientHandler.cancelStream", stream.tag());
cmd.getLink().link();
try {
Status reason = cmd.reason(); Status reason = cmd.reason();
if (reason != null) { if (reason != null) {
stream.transportReportStatus(reason, true, new Metadata()); stream.transportReportStatus(reason, true, new Metadata());
@ -604,6 +629,9 @@ class NettyClientHandler extends AbstractNettyHandler {
} else { } else {
promise.setSuccess(); promise.setSuccess();
} }
} finally {
PerfMark.stopTask("NettyClientHandler.cancelStream", stream.tag());
}
} }
/** /**
@ -611,16 +639,33 @@ class NettyClientHandler extends AbstractNettyHandler {
*/ */
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
PerfMark.startTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag());
cmd.getLink().link();
try {
// Call the base class to write the HTTP/2 DATA frame. // Call the base class to write the HTTP/2 DATA frame.
// Note: no need to flush since this is handled by the outbound flow controller. // Note: no need to flush since this is handled by the outbound flow controller.
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
} finally {
PerfMark.stopTask("NettyClientHandler.sendGrpcFrame", cmd.stream().tag());
}
}
private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
ChannelPromise promise) {
PerfMark.startTask("NettyClientHandler.sendPingFrame");
msg.getLink().link();
try {
sendPingFrameTraced(ctx, msg, promise);
} finally {
PerfMark.stopTask("NettyClientHandler.sendPingFrame");
}
} }
/** /**
* Sends a PING frame. If a ping operation is already outstanding, the callback in the message is * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
* registered to be called when the existing operation completes, and no new frame is sent. * registered to be called when the existing operation completes, and no new frame is sent.
*/ */
private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
ChannelPromise promise) { ChannelPromise promise) {
// Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
// but before termination. After termination, messages will no longer arrive because the // but before termination. After termination, messages will no longer arrive because the
@ -690,12 +735,19 @@ class NettyClientHandler extends AbstractNettyHandler {
@Override @Override
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
NettyClientStream.TransportState clientStream = clientStream(stream); NettyClientStream.TransportState clientStream = clientStream(stream);
Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyClientHandler.forcefulClose", tag);
msg.getLink().link();
try {
if (clientStream != null) { if (clientStream != null) {
clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} }
stream.close(); stream.close();
return true; return true;
} finally {
PerfMark.stopTask("NettyClientHandler.forcefulClose", tag);
}
} }
}); });
promise.setSuccess(); promise.setSuccess();

View File

@ -43,7 +43,9 @@ import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -215,9 +217,20 @@ class NettyClientStream extends AbstractClientStream {
transportState().requestMessagesFromDeframer(numMessages); transportState().requestMessagesFromDeframer(numMessages);
} else { } else {
channel.eventLoop().execute(new Runnable() { channel.eventLoop().execute(new Runnable() {
final Link link = PerfMark.link();
@Override @Override
public void run() { public void run() {
PerfMark.startTask(
"NettyClientStream$Sink.requestMessagesFromDeframer",
transportState().tag());
link.link();
try {
transportState().requestMessagesFromDeframer(numMessages); transportState().requestMessagesFromDeframer(numMessages);
} finally {
PerfMark.stopTask(
"NettyClientStream$Sink.requestMessagesFromDeframer",
transportState().tag());
}
} }
}); });
} }
@ -249,20 +262,25 @@ class NettyClientStream extends AbstractClientStream {
implements StreamIdHolder { implements StreamIdHolder {
private static final int NON_EXISTENT_ID = -1; private static final int NON_EXISTENT_ID = -1;
private final String methodName;
private final NettyClientHandler handler; private final NettyClientHandler handler;
private final EventLoop eventLoop; private final EventLoop eventLoop;
private int id; private int id;
private Http2Stream http2Stream; private Http2Stream http2Stream;
private Tag tag;
public TransportState( public TransportState(
NettyClientHandler handler, NettyClientHandler handler,
EventLoop eventLoop, EventLoop eventLoop,
int maxMessageSize, int maxMessageSize,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) { TransportTracer transportTracer,
String methodName) {
super(maxMessageSize, statsTraceCtx, transportTracer); super(maxMessageSize, statsTraceCtx, transportTracer);
this.methodName = checkNotNull(methodName, "methodName");
this.handler = checkNotNull(handler, "handler"); this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(eventLoop, "eventLoop"); this.eventLoop = checkNotNull(eventLoop, "eventLoop");
tag = PerfMark.createTag(methodName);
} }
@Override @Override
@ -275,6 +293,7 @@ class NettyClientStream extends AbstractClientStream {
checkArgument(id > 0, "id must be positive %s", id); checkArgument(id > 0, "id must be positive %s", id);
checkState(this.id == 0, "id has been previously set: %s", this.id); checkState(this.id == 0, "id has been previously set: %s", this.id);
this.id = id; this.id = id;
this.tag = PerfMark.createTag(methodName, id);
} }
/** /**
@ -359,5 +378,10 @@ class NettyClientStream extends AbstractClientStream {
void transportDataReceived(ByteBuf frame, boolean endOfStream) { void transportDataReceived(ByteBuf frame, boolean endOfStream) {
transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
} }
@Override
public final Tag tag() {
return tag;
}
} }
} }

View File

@ -176,7 +176,8 @@ class NettyClientTransport implements ConnectionClientTransport {
channel.eventLoop(), channel.eventLoop(),
maxMessageSize, maxMessageSize,
statsTraceCtx, statsTraceCtx,
transportTracer) { transportTracer,
method.getFullMethodName()) {
@Override @Override
protected Status statusFromFailedFuture(ChannelFuture f) { protected Status statusFromFailedFuture(ChannelFuture f) {
return NettyClientTransport.this.statusFromFailedFuture(f); return NettyClientTransport.this.statusFromFailedFuture(f);

View File

@ -83,6 +83,8 @@ import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.util.List; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
@ -427,7 +429,11 @@ class NettyServerHandler extends AbstractNettyHandler {
http2Stream, http2Stream,
maxMessageSize, maxMessageSize,
statsTraceCtx, statsTraceCtx,
transportTracer); transportTracer,
method);
PerfMark.startTask("NettyServerHandler.onHeadersRead", state.tag());
try {
String authority = getOrUpdateAuthority((AsciiString) headers.authority()); String authority = getOrUpdateAuthority((AsciiString) headers.authority());
NettyServerStream stream = new NettyServerStream( NettyServerStream stream = new NettyServerStream(
ctx.channel(), ctx.channel(),
@ -439,6 +445,9 @@ class NettyServerHandler extends AbstractNettyHandler {
transportListener.streamCreated(stream, method, metadata); transportListener.streamCreated(stream, method, metadata);
state.onStreamAllocated(); state.onStreamAllocated();
http2Stream.setProperty(streamKey, state); http2Stream.setProperty(streamKey, state);
} finally {
PerfMark.stopTask("NettyServerHandler.onHeadersRead", state.tag());
}
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "Exception in onHeadersRead()", e); logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
// Throw an exception that will get handled by onStreamError. // Throw an exception that will get handled by onStreamError.
@ -463,7 +472,12 @@ class NettyServerHandler extends AbstractNettyHandler {
flowControlPing().onDataRead(data.readableBytes(), padding); flowControlPing().onDataRead(data.readableBytes(), padding);
try { try {
NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
PerfMark.startTask("NettyServerHandler.onDataRead", stream.tag());
try {
stream.inboundDataReceived(data, endOfStream); stream.inboundDataReceived(data, endOfStream);
} finally {
PerfMark.stopTask("NettyServerHandler.onDataRead", stream.tag());
}
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onDataRead()", e); logger.log(Level.WARNING, "Exception in onDataRead()", e);
// Throw an exception that will get handled by onStreamError. // Throw an exception that will get handled by onStreamError.
@ -475,8 +489,13 @@ class NettyServerHandler extends AbstractNettyHandler {
try { try {
NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
if (stream != null) { if (stream != null) {
PerfMark.startTask("NettyServerHandler.onRstStreamRead", stream.tag());
try {
stream.transportReportStatus( stream.transportReportStatus(
Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
} finally {
PerfMark.stopTask("NettyServerHandler.onRstStreamRead", stream.tag());
}
} }
} catch (Throwable e) { } catch (Throwable e) {
logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
@ -499,12 +518,18 @@ class NettyServerHandler extends AbstractNettyHandler {
logger.log(Level.WARNING, "Stream Error", cause); logger.log(Level.WARNING, "Stream Error", cause);
NettyServerStream.TransportState serverStream = serverStream( NettyServerStream.TransportState serverStream = serverStream(
connection().stream(Http2Exception.streamId(http2Ex))); connection().stream(Http2Exception.streamId(http2Ex)));
Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
PerfMark.startTask("NettyServerHandler.onStreamError", tag);
try {
if (serverStream != null) { if (serverStream != null) {
serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
} }
// TODO(ejona): Abort the stream by sending headers to help the client with debugging. // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
// Delegate to the base class to send a RST_STREAM. // Delegate to the base class to send a RST_STREAM.
super.onStreamError(ctx, outbound, cause, http2Ex); super.onStreamError(ctx, outbound, cause, http2Ex);
} finally {
PerfMark.stopTask("NettyServerHandler.onStreamError", tag);
}
} }
@Override @Override
@ -623,11 +648,17 @@ class NettyServerHandler extends AbstractNettyHandler {
*/ */
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) throws Http2Exception { ChannelPromise promise) throws Http2Exception {
PerfMark.startTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag());
cmd.getLink().link();
try {
if (cmd.endStream()) { if (cmd.endStream()) {
closeStreamWhenDone(promise, cmd.streamId()); closeStreamWhenDone(promise, cmd.stream().id());
} }
// Call the base class to write the HTTP/2 DATA frame. // Call the base class to write the HTTP/2 DATA frame.
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.sendGrpcFrame", cmd.stream().tag());
}
} }
/** /**
@ -635,8 +666,11 @@ class NettyServerHandler extends AbstractNettyHandler {
*/ */
private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
ChannelPromise promise) throws Http2Exception { ChannelPromise promise) throws Http2Exception {
// TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is PerfMark.startTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag());
// fixed. cmd.getLink().link();
try {
// TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
// is fixed.
int streamId = cmd.stream().id(); int streamId = cmd.stream().id();
Http2Stream stream = connection().stream(streamId); Http2Stream stream = connection().stream(streamId);
if (stream == null) { if (stream == null) {
@ -647,14 +681,23 @@ class NettyServerHandler extends AbstractNettyHandler {
closeStreamWhenDone(promise, streamId); closeStreamWhenDone(promise, streamId);
} }
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.sendResponseHeaders", cmd.stream().tag());
}
} }
private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
ChannelPromise promise) { ChannelPromise promise) {
PerfMark.startTask("NettyServerHandler.cancelStream", cmd.stream().tag());
cmd.getLink().link();
try {
// Notify the listener if we haven't already. // Notify the listener if we haven't already.
cmd.stream().transportReportStatus(cmd.reason()); cmd.stream().transportReportStatus(cmd.reason());
// Terminate the stream. // Terminate the stream.
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
} finally {
PerfMark.stopTask("NettyServerHandler.cancelStream", cmd.stream().tag());
}
} }
private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
@ -665,8 +708,14 @@ class NettyServerHandler extends AbstractNettyHandler {
public boolean visit(Http2Stream stream) throws Http2Exception { public boolean visit(Http2Stream stream) throws Http2Exception {
NettyServerStream.TransportState serverStream = serverStream(stream); NettyServerStream.TransportState serverStream = serverStream(stream);
if (serverStream != null) { if (serverStream != null) {
PerfMark.startTask("NettyServerHandler.forcefulClose", serverStream.tag());
msg.getLink().link();
try {
serverStream.transportReportStatus(msg.getStatus()); serverStream.transportReportStatus(msg.getStatus());
resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
} finally {
PerfMark.stopTask("NettyServerHandler.forcefulClose", serverStream.tag());
}
} }
stream.close(); stream.close();
return true; return true;

View File

@ -33,7 +33,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.perfmark.Link;
import io.perfmark.PerfMark; import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -98,10 +100,21 @@ class NettyServerStream extends AbstractServerStream {
// Processing data read in the event loop so can call into the deframer immediately // Processing data read in the event loop so can call into the deframer immediately
transportState().requestMessagesFromDeframer(numMessages); transportState().requestMessagesFromDeframer(numMessages);
} else { } else {
final Link link = PerfMark.link();
channel.eventLoop().execute(new Runnable() { channel.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
PerfMark.startTask(
"NettyServerStream$Sink.requestMessagesFromDeframer",
transportState().tag());
link.link();
try {
transportState().requestMessagesFromDeframer(numMessages); transportState().requestMessagesFromDeframer(numMessages);
} finally {
PerfMark.stopTask(
"NettyServerStream$Sink.requestMessagesFromDeframer",
transportState().tag());
}
} }
}); });
} }
@ -195,6 +208,7 @@ class NettyServerStream extends AbstractServerStream {
private final Http2Stream http2Stream; private final Http2Stream http2Stream;
private final NettyServerHandler handler; private final NettyServerHandler handler;
private final EventLoop eventLoop; private final EventLoop eventLoop;
private final Tag tag;
public TransportState( public TransportState(
NettyServerHandler handler, NettyServerHandler handler,
@ -202,11 +216,13 @@ class NettyServerStream extends AbstractServerStream {
Http2Stream http2Stream, Http2Stream http2Stream,
int maxMessageSize, int maxMessageSize,
StatsTraceContext statsTraceCtx, StatsTraceContext statsTraceCtx,
TransportTracer transportTracer) { TransportTracer transportTracer,
String methodName) {
super(maxMessageSize, statsTraceCtx, transportTracer); super(maxMessageSize, statsTraceCtx, transportTracer);
this.http2Stream = checkNotNull(http2Stream, "http2Stream"); this.http2Stream = checkNotNull(http2Stream, "http2Stream");
this.handler = checkNotNull(handler, "handler"); this.handler = checkNotNull(handler, "handler");
this.eventLoop = eventLoop; this.eventLoop = eventLoop;
this.tag = PerfMark.createTag(methodName, http2Stream.id());
} }
@Override @Override
@ -240,6 +256,11 @@ class NettyServerStream extends AbstractServerStream {
public int id() { public int id() {
return http2Stream.id(); return http2Stream.id();
} }
@Override
public Tag tag() {
return tag;
}
} }
@Override @Override

View File

@ -46,8 +46,8 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu
return link; return link;
} }
int streamId() { StreamIdHolder stream() {
return stream.id(); return stream;
} }
boolean endStream() { boolean endStream() {
@ -100,7 +100,7 @@ final class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQu
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "(streamId=" + streamId() return getClass().getSimpleName() + "(streamId=" + stream.id()
+ ", endStream=" + endStream + ", content=" + content() + ", endStream=" + endStream + ", content=" + content()
+ ")"; + ")";
} }

View File

@ -16,10 +16,14 @@
package io.grpc.netty; package io.grpc.netty;
import io.perfmark.Tag;
/** Container for stream ids. */ /** Container for stream ids. */
interface StreamIdHolder { interface StreamIdHolder {
/** /**
* Returns the id. * Returns the id.
*/ */
int id(); int id();
Tag tag();
} }

View File

@ -112,13 +112,7 @@ class WriteQueue {
int i = 0; int i = 0;
boolean flushedOnce = false; boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) { while ((cmd = queue.poll()) != null) {
PerfMark.startTask("WriteQueue.run");
try {
cmd.getLink().link();
cmd.run(channel); cmd.run(channel);
} finally {
PerfMark.stopTask("WriteQueue.run");
}
if (++i == DEQUE_CHUNK_SIZE) { if (++i == DEQUE_CHUNK_SIZE) {
i = 0; i = 0;
// Flush each chunk so we are releasing buffers periodically. In theory this loop // Flush each chunk so we are releasing buffers periodically. In theory this loop

View File

@ -789,7 +789,13 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
EventLoop eventLoop, EventLoop eventLoop,
int maxMessageSize, int maxMessageSize,
TransportTracer transportTracer) { TransportTracer transportTracer) {
super(handler, eventLoop, maxMessageSize, StatsTraceContext.NOOP, transportTracer); super(
handler,
eventLoop,
maxMessageSize,
StatsTraceContext.NOOP,
transportTracer,
"methodName");
} }
@Override @Override

View File

@ -550,7 +550,13 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
private class TransportStateImpl extends NettyClientStream.TransportState { private class TransportStateImpl extends NettyClientStream.TransportState {
public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) { public TransportStateImpl(NettyClientHandler handler, int maxMessageSize) {
super(handler, channel.eventLoop(), maxMessageSize, StatsTraceContext.NOOP, transportTracer); super(
handler,
channel.eventLoop(),
maxMessageSize,
StatsTraceContext.NOOP,
transportTracer,
"methodName");
} }
@Override @Override

View File

@ -287,7 +287,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase<NettyServerStream
TransportTracer transportTracer = new TransportTracer(); TransportTracer transportTracer = new TransportTracer();
NettyServerStream.TransportState state = new NettyServerStream.TransportState( NettyServerStream.TransportState state = new NettyServerStream.TransportState(
handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx, handler, channel.eventLoop(), http2Stream, DEFAULT_MAX_MESSAGE_SIZE, statsTraceCtx,
transportTracer); transportTracer, "method");
NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY, NettyServerStream stream = new NettyServerStream(channel, state, Attributes.EMPTY,
"test-authority", statsTraceCtx, transportTracer); "test-authority", statsTraceCtx, transportTracer);
stream.transportState().setListener(serverListener); stream.transportState().setListener(serverListener);