Merge pull request #587 from DataDog/mar-kolya/set-netty-continuation-once
Set netty continuation only once
This commit is contained in:
commit
cd9ab3daef
|
@ -62,32 +62,39 @@ public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {
|
||||||
public static class OperationCompleteAdvice {
|
public static class OperationCompleteAdvice {
|
||||||
@Advice.OnMethodEnter
|
@Advice.OnMethodEnter
|
||||||
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
|
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
|
||||||
|
/*
|
||||||
|
Idea here is:
|
||||||
|
- To return scope only if we have captured it.
|
||||||
|
- To capture scope only in case of error.
|
||||||
|
*/
|
||||||
|
final Throwable cause = future.cause();
|
||||||
|
if (cause == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
final TraceScope.Continuation continuation =
|
final TraceScope.Continuation continuation =
|
||||||
future.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).get();
|
future
|
||||||
|
.channel()
|
||||||
|
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
|
||||||
|
.getAndRemove();
|
||||||
if (continuation == null) {
|
if (continuation == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final TraceScope scope = continuation.activate();
|
final TraceScope scope = continuation.activate();
|
||||||
|
|
||||||
final Throwable cause = future.cause();
|
final Span errorSpan =
|
||||||
if (cause != null) {
|
GlobalTracer.get()
|
||||||
final Span errorSpan =
|
.buildSpan("netty.connect")
|
||||||
GlobalTracer.get()
|
.withTag(Tags.COMPONENT.getKey(), "netty")
|
||||||
.buildSpan("netty.connect")
|
.start();
|
||||||
.withTag(Tags.COMPONENT.getKey(), "netty")
|
Tags.ERROR.set(errorSpan, true);
|
||||||
.start();
|
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
|
||||||
Tags.ERROR.set(errorSpan, true);
|
errorSpan.finish();
|
||||||
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
|
|
||||||
errorSpan.finish();
|
|
||||||
}
|
|
||||||
|
|
||||||
return scope;
|
return scope;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||||
public static void deactivateScope(
|
public static void deactivateScope(@Advice.Enter final TraceScope scope) {
|
||||||
@Advice.Enter final TraceScope scope, @Advice.Thrown final Throwable throwable) {
|
|
||||||
if (scope != null) {
|
if (scope != null) {
|
||||||
((Scope) scope).close();
|
((Scope) scope).close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseDecoder;
|
import io.netty.handler.codec.http.HttpResponseDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
import io.netty.handler.codec.http.HttpServerCodec;
|
import io.netty.handler.codec.http.HttpServerCodec;
|
||||||
|
import io.netty.util.Attribute;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.util.GlobalTracer;
|
import io.opentracing.util.GlobalTracer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -137,12 +138,15 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
|
||||||
@Advice.OnMethodEnter
|
@Advice.OnMethodEnter
|
||||||
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
|
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
|
||||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||||
|
|
||||||
if (scope instanceof TraceScope) {
|
if (scope instanceof TraceScope) {
|
||||||
pipeline
|
final TraceScope.Continuation continuation = ((TraceScope) scope).capture();
|
||||||
.channel()
|
if (null != continuation) {
|
||||||
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
|
final Attribute<TraceScope.Continuation> attribute =
|
||||||
.set(((TraceScope) scope).capture());
|
pipeline.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY);
|
||||||
|
if (!attribute.compareAndSet(null, continuation)) {
|
||||||
|
continuation.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||||
|
|
||||||
import datadog.trace.api.DDSpanTypes;
|
import datadog.trace.api.DDSpanTypes;
|
||||||
import datadog.trace.api.DDTags;
|
import datadog.trace.api.DDTags;
|
||||||
|
import datadog.trace.context.TraceScope;
|
||||||
import datadog.trace.instrumentation.netty40.AttributeKeys;
|
import datadog.trace.instrumentation.netty40.AttributeKeys;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
|
@ -25,6 +26,14 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
||||||
ctx.write(msg, prm);
|
ctx.write(msg, prm);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceScope scope = null;
|
||||||
|
final TraceScope.Continuation continuation =
|
||||||
|
ctx.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
|
||||||
|
if (continuation != null) {
|
||||||
|
scope = continuation.activate();
|
||||||
|
}
|
||||||
|
|
||||||
final HttpRequest request = (HttpRequest) msg;
|
final HttpRequest request = (HttpRequest) msg;
|
||||||
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||||
|
|
||||||
|
@ -58,5 +67,9 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
||||||
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
||||||
throw throwable;
|
throw throwable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null != scope) {
|
||||||
|
scope.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,10 +41,6 @@ public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scope instanceof TraceScope) {
|
|
||||||
((TraceScope) scope).setAsyncPropagation(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (finishSpan) {
|
if (finishSpan) {
|
||||||
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).getStatus().code());
|
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).getStatus().code());
|
||||||
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
||||||
|
|
|
@ -62,32 +62,39 @@ public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {
|
||||||
public static class OperationCompleteAdvice {
|
public static class OperationCompleteAdvice {
|
||||||
@Advice.OnMethodEnter
|
@Advice.OnMethodEnter
|
||||||
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
|
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
|
||||||
|
/*
|
||||||
|
Idea here is:
|
||||||
|
- To return scope only if we have captured it.
|
||||||
|
- To capture scope only in case of error.
|
||||||
|
*/
|
||||||
|
final Throwable cause = future.cause();
|
||||||
|
if (cause == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
final TraceScope.Continuation continuation =
|
final TraceScope.Continuation continuation =
|
||||||
future.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).get();
|
future
|
||||||
|
.channel()
|
||||||
|
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
|
||||||
|
.getAndRemove();
|
||||||
if (continuation == null) {
|
if (continuation == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final TraceScope scope = continuation.activate();
|
final TraceScope scope = continuation.activate();
|
||||||
|
|
||||||
final Throwable cause = future.cause();
|
final Span errorSpan =
|
||||||
if (cause != null) {
|
GlobalTracer.get()
|
||||||
final Span errorSpan =
|
.buildSpan("netty.connect")
|
||||||
GlobalTracer.get()
|
.withTag(Tags.COMPONENT.getKey(), "netty")
|
||||||
.buildSpan("netty.connect")
|
.start();
|
||||||
.withTag(Tags.COMPONENT.getKey(), "netty")
|
Tags.ERROR.set(errorSpan, true);
|
||||||
.start();
|
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
|
||||||
Tags.ERROR.set(errorSpan, true);
|
errorSpan.finish();
|
||||||
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
|
|
||||||
errorSpan.finish();
|
|
||||||
}
|
|
||||||
|
|
||||||
return scope;
|
return scope;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||||
public static void deactivateScope(
|
public static void deactivateScope(@Advice.Enter final TraceScope scope) {
|
||||||
@Advice.Enter final TraceScope scope, @Advice.Thrown final Throwable throwable) {
|
|
||||||
if (scope != null) {
|
if (scope != null) {
|
||||||
((Scope) scope).close();
|
((Scope) scope).close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseDecoder;
|
import io.netty.handler.codec.http.HttpResponseDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
import io.netty.handler.codec.http.HttpServerCodec;
|
import io.netty.handler.codec.http.HttpServerCodec;
|
||||||
|
import io.netty.util.Attribute;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.util.GlobalTracer;
|
import io.opentracing.util.GlobalTracer;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -137,12 +138,15 @@ public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {
|
||||||
@Advice.OnMethodEnter
|
@Advice.OnMethodEnter
|
||||||
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
|
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
|
||||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||||
|
|
||||||
if (scope instanceof TraceScope) {
|
if (scope instanceof TraceScope) {
|
||||||
pipeline
|
final TraceScope.Continuation continuation = ((TraceScope) scope).capture();
|
||||||
.channel()
|
if (null != continuation) {
|
||||||
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
|
final Attribute<TraceScope.Continuation> attribute =
|
||||||
.set(((TraceScope) scope).capture());
|
pipeline.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY);
|
||||||
|
if (!attribute.compareAndSet(null, continuation)) {
|
||||||
|
continuation.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||||
|
|
||||||
import datadog.trace.api.DDSpanTypes;
|
import datadog.trace.api.DDSpanTypes;
|
||||||
import datadog.trace.api.DDTags;
|
import datadog.trace.api.DDTags;
|
||||||
|
import datadog.trace.context.TraceScope;
|
||||||
import datadog.trace.instrumentation.netty41.AttributeKeys;
|
import datadog.trace.instrumentation.netty41.AttributeKeys;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
|
@ -25,6 +26,14 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
||||||
ctx.write(msg, prm);
|
ctx.write(msg, prm);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TraceScope scope = null;
|
||||||
|
final TraceScope.Continuation continuation =
|
||||||
|
ctx.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
|
||||||
|
if (continuation != null) {
|
||||||
|
scope = continuation.activate();
|
||||||
|
}
|
||||||
|
|
||||||
final HttpRequest request = (HttpRequest) msg;
|
final HttpRequest request = (HttpRequest) msg;
|
||||||
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
final InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||||
|
|
||||||
|
@ -58,5 +67,9 @@ public class HttpClientRequestTracingHandler extends ChannelOutboundHandlerAdapt
|
||||||
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
||||||
throw throwable;
|
throw throwable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (null != scope) {
|
||||||
|
scope.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,10 +41,6 @@ public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scope instanceof TraceScope) {
|
|
||||||
((TraceScope) scope).setAsyncPropagation(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (finishSpan) {
|
if (finishSpan) {
|
||||||
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).status().code());
|
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).status().code());
|
||||||
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
span.finish(); // Finish the span manually since finishSpanOnClose was false
|
||||||
|
|
Loading…
Reference in New Issue