Add option to create span on new netty connection (#3707)
* Create span on new netty connection * add test for connection failure * add comment * remove commented out line * rebase * test fix * review comments * keep connection failure span as client span
This commit is contained in:
parent
ef16e32f95
commit
a04a7a6b72
|
@ -12,7 +12,6 @@ import io.netty.util.concurrent.ProgressiveFuture;
|
|||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.caching.Cache;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer;
|
||||
|
||||
public final class FutureListenerWrappers {
|
||||
// Instead of ContextStore use Cache with weak keys and weak values to store link between original
|
||||
|
@ -64,7 +63,6 @@ public final class FutureListenerWrappers {
|
|||
|
||||
@Override
|
||||
public void operationComplete(Future<?> future) throws Exception {
|
||||
AbstractNettyHttpClientTracer.operationComplete(future);
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
delegate.operationComplete(future);
|
||||
}
|
||||
|
@ -93,7 +91,6 @@ public final class FutureListenerWrappers {
|
|||
|
||||
@Override
|
||||
public void operationComplete(ProgressiveFuture<?> progressiveFuture) throws Exception {
|
||||
AbstractNettyHttpClientTracer.operationComplete(progressiveFuture);
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
delegate.operationComplete(progressiveFuture);
|
||||
}
|
||||
|
|
|
@ -6,25 +6,26 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.netty.common.client;
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
|
||||
import static io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyResponseInjectAdapter.SETTER;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.propagation.TextMapSetter;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
@ -32,6 +33,10 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
public abstract class AbstractNettyHttpClientTracer<REQUEST extends AbstractNettyRequestWrapper>
|
||||
extends HttpClientTracer<REQUEST, HttpHeaders, HttpResponse> {
|
||||
|
||||
private static final boolean alwaysCreateConnectSpan =
|
||||
Config.get()
|
||||
.getBooleanProperty("otel.instrumentation.netty.always-create-connect-span", false);
|
||||
|
||||
protected AbstractNettyHttpClientTracer() {
|
||||
super(NetPeerAttributes.INSTANCE);
|
||||
}
|
||||
|
@ -84,39 +89,60 @@ public abstract class AbstractNettyHttpClientTracer<REQUEST extends AbstractNett
|
|||
return SETTER;
|
||||
}
|
||||
|
||||
public static void operationComplete(Future<?> future) {
|
||||
AbstractNettyHttpClientTracer tracer = NettyHttpClientTracerAccess.getTracer();
|
||||
if (tracer == null) {
|
||||
return;
|
||||
public Context startConnectionSpan(Context parentContext, SocketAddress remoteAddress) {
|
||||
if (!alwaysCreateConnectSpan) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!(future instanceof ChannelFuture)) {
|
||||
return;
|
||||
}
|
||||
// If first call to GenericFutureListener#operationComplete has an exception then we
|
||||
// treat it as the cause of connection failure and create a special span for it
|
||||
ChannelFuture channelFuture = (ChannelFuture) future;
|
||||
Context parentContext = tracer.getAndRemoveConnectContext(channelFuture);
|
||||
if (parentContext == null) {
|
||||
return;
|
||||
}
|
||||
Throwable cause = future.cause();
|
||||
if (cause == null) {
|
||||
return;
|
||||
}
|
||||
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", INTERNAL);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress);
|
||||
|
||||
if (tracer.shouldStartSpan(parentContext, SpanKind.CLIENT)) {
|
||||
tracer.connectionFailure(parentContext, channelFuture.channel(), cause);
|
||||
return parentContext.with(spanBuilder.startSpan());
|
||||
}
|
||||
|
||||
public void endConnectionSpan(
|
||||
Context context,
|
||||
Context parentContext,
|
||||
SocketAddress remoteAddress,
|
||||
Channel channel,
|
||||
Throwable throwable) {
|
||||
if (alwaysCreateConnectSpan) {
|
||||
if (context != null) {
|
||||
// if context is present we started span in startConnectionSpan
|
||||
endConnectionSpan(context, channel, throwable);
|
||||
}
|
||||
} else if (throwable != null && shouldStartSpan(parentContext, CLIENT)) {
|
||||
// if we didn't start span in startConnectionSpan create a span only when the request fails
|
||||
// and when not inside a client span
|
||||
connectionFailure(parentContext, remoteAddress, channel, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Context getAndRemoveConnectContext(ChannelFuture channelFuture);
|
||||
private void endConnectionSpan(Context context, Channel channel, Throwable throwable) {
|
||||
if (channel != null) {
|
||||
Span span = Span.fromContext(context);
|
||||
span.setAttribute(
|
||||
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(span, (InetSocketAddress) channel.remoteAddress());
|
||||
}
|
||||
if (throwable != null) {
|
||||
endExceptionally(context, throwable);
|
||||
} else {
|
||||
end(context);
|
||||
}
|
||||
}
|
||||
|
||||
private void connectionFailure(Context parentContext, Channel channel, Throwable throwable) {
|
||||
private void connectionFailure(
|
||||
Context parentContext, SocketAddress remoteAddress, Channel channel, Throwable throwable) {
|
||||
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT);
|
||||
spanBuilder.setAttribute(
|
||||
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) channel.remoteAddress());
|
||||
if (channel != null) {
|
||||
spanBuilder.setAttribute(
|
||||
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(
|
||||
spanBuilder, (InetSocketAddress) channel.remoteAddress());
|
||||
} else if (remoteAddress != null) {
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress);
|
||||
}
|
||||
|
||||
Context context = withClientSpan(parentContext, spanBuilder.startSpan());
|
||||
endExceptionally(context, throwable);
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.netty.common.client;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GenericFutureListener;
|
||||
import io.opentelemetry.context.Context;
|
||||
|
||||
public class ConnectionCompleteListener implements GenericFutureListener<Future<Void>> {
|
||||
private final Context context;
|
||||
private final Context parentContext;
|
||||
|
||||
public ConnectionCompleteListener(Context context, Context parentContext) {
|
||||
this.context = context;
|
||||
this.parentContext = parentContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void operationComplete(Future<Void> future) {
|
||||
AbstractNettyHttpClientTracer tracer = NettyHttpClientTracerAccess.getTracer();
|
||||
if (tracer == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Channel channel = null;
|
||||
if (future instanceof ChannelFuture) {
|
||||
channel = ((ChannelFuture) future).channel();
|
||||
}
|
||||
tracer.endConnectionSpan(context, parentContext, null, channel, future.cause());
|
||||
}
|
||||
}
|
|
@ -29,6 +29,25 @@ dependencies {
|
|||
latestDepTestLibrary("io.netty:netty-codec-http:4.0.56.Final")
|
||||
}
|
||||
|
||||
tasks {
|
||||
val testConnectionSpan by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("Netty40ConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/Netty40ConnectionSpanTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
dependsOn(testConnectionSpan)
|
||||
filter {
|
||||
excludeTestsMatching("Netty40ConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need to force the dependency to the earliest supported version because other libraries declare newer versions.
|
||||
if (!(findProperty("testLatestDeps") as Boolean)) {
|
||||
configurations.configureEach {
|
||||
|
|
|
@ -8,9 +8,16 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_0;
|
|||
import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.NettyHttpClientTracer.tracer;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.ConnectionCompleteListener;
|
||||
import java.net.SocketAddress;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
@ -25,6 +32,9 @@ public class BootstrapInstrumentation implements TypeInstrumentation {
|
|||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("doConnect").and(takesArgument(0, SocketAddress.class)),
|
||||
BootstrapInstrumentation.class.getName() + "$ConnectAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -37,4 +47,38 @@ public class BootstrapInstrumentation implements TypeInstrumentation {
|
|||
tracer();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ConnectAdvice {
|
||||
@Advice.OnMethodEnter
|
||||
public static void startConnect(
|
||||
@Advice.Argument(0) SocketAddress remoteAddress,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
parentContext = Java8BytecodeBridge.currentContext();
|
||||
context = tracer().startConnectionSpan(parentContext, remoteAddress);
|
||||
if (context != null) {
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void endConnect(
|
||||
@Advice.Thrown Throwable throwable,
|
||||
@Advice.Argument(0) SocketAddress remoteAddress,
|
||||
@Advice.Return ChannelFuture channelFuture,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
|
||||
if (throwable != null) {
|
||||
tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable);
|
||||
} else {
|
||||
channelFuture.addListener(new ConnectionCompleteListener(context, parentContext));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_0;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
|
@ -19,12 +18,9 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
|
|||
import io.netty.handler.codec.http.HttpResponseDecoder;
|
||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import io.netty.handler.codec.http.HttpServerCodec;
|
||||
import io.netty.util.Attribute;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.AbstractNettyChannelPipelineInstrumentation;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientResponseTracingHandler;
|
||||
|
@ -46,9 +42,6 @@ public class NettyChannelPipelineInstrumentation
|
|||
.and(nameStartsWith("add"))
|
||||
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
|
||||
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineAddAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
|
||||
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineConnectAdvice");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -103,15 +96,4 @@ public class NettyChannelPipelineInstrumentation
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ChannelPipelineConnectAdvice {
|
||||
|
||||
@Advice.OnMethodEnter
|
||||
public static void addParentSpan(@Advice.This ChannelPipeline pipeline) {
|
||||
Context context = Java8BytecodeBridge.currentContext();
|
||||
Attribute<Context> attribute = pipeline.channel().attr(AttributeKeys.CONNECT_CONTEXT);
|
||||
attribute.compareAndSet(null, context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,12 +5,9 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyHttpClientTracerAccess;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;
|
||||
|
||||
public class NettyHttpClientTracer extends AbstractNettyHttpClientTracer<NettyRequestWrapper> {
|
||||
private static final NettyHttpClientTracer TRACER = new NettyHttpClientTracer();
|
||||
|
@ -25,11 +22,6 @@ public class NettyHttpClientTracer extends AbstractNettyHttpClientTracer<NettyRe
|
|||
return TRACER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getAndRemoveConnectContext(ChannelFuture channelFuture) {
|
||||
return channelFuture.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer status(HttpResponse httpResponse) {
|
||||
return httpResponse.getStatus().code();
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.ChannelInitializer
|
||||
import io.netty.channel.ChannelPipeline
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.channel.socket.nio.NioSocketChannel
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest
|
||||
import io.netty.handler.codec.http.HttpClientCodec
|
||||
import io.netty.handler.codec.http.HttpHeaders
|
||||
import io.netty.handler.codec.http.HttpMethod
|
||||
import io.netty.handler.codec.http.HttpVersion
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import spock.lang.Shared
|
||||
|
||||
class Netty40ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
|
||||
|
||||
@Shared
|
||||
private HttpClientTestServer server
|
||||
|
||||
@Shared
|
||||
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
|
||||
|
||||
@Shared
|
||||
private Bootstrap bootstrap = buildBootstrap()
|
||||
|
||||
def setupSpec() {
|
||||
server = new HttpClientTestServer(openTelemetry)
|
||||
server.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
eventLoopGroup.shutdownGracefully()
|
||||
server.stop()
|
||||
}
|
||||
|
||||
Bootstrap buildBootstrap() {
|
||||
Bootstrap bootstrap = new Bootstrap()
|
||||
bootstrap.group(eventLoopGroup)
|
||||
.channel(NioSocketChannel)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline()
|
||||
pipeline.addLast(new HttpClientCodec())
|
||||
}
|
||||
})
|
||||
|
||||
return bootstrap
|
||||
}
|
||||
|
||||
DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
|
||||
HttpHeaders.setHost(request, uri.host)
|
||||
headers.each { k, v -> request.headers().set(k, v) }
|
||||
return request
|
||||
}
|
||||
|
||||
int sendRequest(DefaultFullHttpRequest request, URI uri) {
|
||||
def channel = bootstrap.connect(uri.host, uri.port).sync().channel()
|
||||
def result = new CompletableFuture<Integer>()
|
||||
channel.pipeline().addLast(new ClientHandler(result))
|
||||
channel.writeAndFlush(request).get()
|
||||
return result.get(20, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
def "test successful request"() {
|
||||
when:
|
||||
def uri = URI.create("http://localhost:${server.httpPort()}/success")
|
||||
def request = buildRequest("GET", uri, [:])
|
||||
def responseCode = runWithSpan("parent") {
|
||||
sendRequest(request, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
responseCode == 200
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" uri.host
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" uri.port
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
}
|
||||
span(3) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test failing request"() {
|
||||
when:
|
||||
URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}")
|
||||
def request = buildRequest("GET", uri, [:])
|
||||
runWithSpan("parent") {
|
||||
sendRequest(request, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
def thrownException = thrown(Exception)
|
||||
|
||||
and:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" uri.host
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" uri.port
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -43,8 +43,23 @@ dependencies {
|
|||
}
|
||||
|
||||
tasks {
|
||||
val testConnectionSpan by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("Netty41ConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/Netty41ConnectionSpanTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
systemProperty("testLatestDeps", findProperty("testLatestDeps"))
|
||||
|
||||
dependsOn(testConnectionSpan)
|
||||
filter {
|
||||
excludeTestsMatching("Netty41ConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,9 +8,16 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
|
|||
import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.NettyHttpClientTracer.tracer;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.ConnectionCompleteListener;
|
||||
import java.net.SocketAddress;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
@ -25,6 +32,9 @@ public class BootstrapInstrumentation implements TypeInstrumentation {
|
|||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("doResolveAndConnect").and(takesArgument(0, SocketAddress.class)),
|
||||
BootstrapInstrumentation.class.getName() + "$ConnectAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -37,4 +47,38 @@ public class BootstrapInstrumentation implements TypeInstrumentation {
|
|||
tracer();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ConnectAdvice {
|
||||
@Advice.OnMethodEnter
|
||||
public static void startConnect(
|
||||
@Advice.Argument(0) SocketAddress remoteAddress,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
parentContext = Java8BytecodeBridge.currentContext();
|
||||
context = tracer().startConnectionSpan(parentContext, remoteAddress);
|
||||
if (context != null) {
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void endConnect(
|
||||
@Advice.Thrown Throwable throwable,
|
||||
@Advice.Argument(0) SocketAddress remoteAddress,
|
||||
@Advice.Return ChannelFuture channelFuture,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
|
||||
if (throwable != null) {
|
||||
tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable);
|
||||
} else {
|
||||
channelFuture.addListener(new ConnectionCompleteListener(context, parentContext));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
|
@ -20,13 +19,9 @@ import io.netty.handler.codec.http.HttpRequestEncoder;
|
|||
import io.netty.handler.codec.http.HttpResponseDecoder;
|
||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import io.netty.handler.codec.http.HttpServerCodec;
|
||||
import io.netty.util.Attribute;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.AbstractNettyChannelPipelineInstrumentation;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientRequestTracingHandler;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientResponseTracingHandler;
|
||||
|
@ -49,9 +44,6 @@ public class NettyChannelPipelineInstrumentation
|
|||
.and(takesArgument(1, String.class))
|
||||
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
|
||||
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineAddAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
|
||||
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineConnectAdvice");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -129,14 +121,4 @@ public class NettyChannelPipelineInstrumentation
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ChannelPipelineConnectAdvice {
|
||||
|
||||
@Advice.OnMethodEnter
|
||||
public static void addParentSpan(@Advice.This ChannelPipeline pipeline) {
|
||||
Attribute<Context> attribute = pipeline.channel().attr(AttributeKeys.CONNECT_CONTEXT);
|
||||
attribute.compareAndSet(null, Java8BytecodeBridge.currentContext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,11 +5,9 @@
|
|||
|
||||
package io.opentelemetry.javaagent.instrumentation.netty.v4_1.client;
|
||||
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer;
|
||||
import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyHttpClientTracerAccess;
|
||||
|
||||
|
@ -26,11 +24,6 @@ public class NettyHttpClientTracer extends AbstractNettyHttpClientTracer<NettyRe
|
|||
return TRACER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context getAndRemoveConnectContext(ChannelFuture channelFuture) {
|
||||
return channelFuture.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer status(HttpResponse httpResponse) {
|
||||
return httpResponse.status().code();
|
||||
|
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
import io.netty.bootstrap.Bootstrap
|
||||
import io.netty.buffer.Unpooled
|
||||
import io.netty.channel.ChannelInitializer
|
||||
import io.netty.channel.ChannelPipeline
|
||||
import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import io.netty.channel.socket.SocketChannel
|
||||
import io.netty.channel.socket.nio.NioSocketChannel
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest
|
||||
import io.netty.handler.codec.http.HttpClientCodec
|
||||
import io.netty.handler.codec.http.HttpHeaderNames
|
||||
import io.netty.handler.codec.http.HttpMethod
|
||||
import io.netty.handler.codec.http.HttpVersion
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeUnit
|
||||
import spock.lang.Shared
|
||||
|
||||
class Netty41ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
|
||||
|
||||
@Shared
|
||||
private HttpClientTestServer server
|
||||
|
||||
@Shared
|
||||
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup()
|
||||
|
||||
@Shared
|
||||
private Bootstrap bootstrap = buildBootstrap()
|
||||
|
||||
def setupSpec() {
|
||||
server = new HttpClientTestServer(openTelemetry)
|
||||
server.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
eventLoopGroup.shutdownGracefully()
|
||||
server.stop()
|
||||
}
|
||||
|
||||
Bootstrap buildBootstrap() {
|
||||
Bootstrap bootstrap = new Bootstrap()
|
||||
bootstrap.group(eventLoopGroup)
|
||||
.channel(NioSocketChannel)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) throws Exception {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline()
|
||||
pipeline.addLast(new HttpClientCodec())
|
||||
}
|
||||
})
|
||||
|
||||
return bootstrap
|
||||
}
|
||||
|
||||
DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
|
||||
request.headers().set(HttpHeaderNames.HOST, uri.host)
|
||||
headers.each { k, v -> request.headers().set(k, v) }
|
||||
return request
|
||||
}
|
||||
|
||||
int sendRequest(DefaultFullHttpRequest request, URI uri) {
|
||||
def channel = bootstrap.connect(uri.host, uri.port).sync().channel()
|
||||
def result = new CompletableFuture<Integer>()
|
||||
channel.pipeline().addLast(new ClientHandler(result))
|
||||
channel.writeAndFlush(request).get()
|
||||
return result.get(20, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
def "test successful request"() {
|
||||
when:
|
||||
def uri = URI.create("http://localhost:${server.httpPort()}/success")
|
||||
def request = buildRequest("GET", uri, [:])
|
||||
def responseCode = runWithSpan("parent") {
|
||||
sendRequest(request, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
responseCode == 200
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" uri.host
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" uri.port
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
}
|
||||
span(3) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test failing request"() {
|
||||
when:
|
||||
URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}")
|
||||
def request = buildRequest("GET", uri, [:])
|
||||
runWithSpan("parent") {
|
||||
sendRequest(request, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
def thrownException = thrown(Exception)
|
||||
|
||||
and:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" uri.host
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" uri.port
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -10,8 +10,6 @@ import io.opentelemetry.context.Context;
|
|||
|
||||
public final class AttributeKeys {
|
||||
|
||||
public static final AttributeKey<Context> CONNECT_CONTEXT =
|
||||
AttributeKey.valueOf(AttributeKeys.class, "connect-context");
|
||||
public static final AttributeKey<Context> WRITE_CONTEXT =
|
||||
AttributeKey.valueOf(AttributeKeys.class, "passed-context");
|
||||
|
||||
|
|
|
@ -25,3 +25,22 @@ dependencies {
|
|||
|
||||
latestDepTestLibrary("io.projectreactor.netty:reactor-netty:(,1.0.0)")
|
||||
}
|
||||
|
||||
tasks {
|
||||
val testConnectionSpan by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("ReactorNettyConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/ReactorNettyConnectionSpanTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
dependsOn(testConnectionSpan)
|
||||
filter {
|
||||
excludeTestsMatching("ReactorNettyConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import reactor.netty.http.client.HttpClient
|
||||
import spock.lang.Shared
|
||||
|
||||
class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
|
||||
|
||||
@Shared
|
||||
private HttpClientTestServer server
|
||||
|
||||
def setupSpec() {
|
||||
server = new HttpClientTestServer(openTelemetry)
|
||||
server.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
def "test successful request"() {
|
||||
when:
|
||||
def httpClient = HttpClient.create()
|
||||
def responseCode =
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${server.httpPort()}/success")
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
.status().code()
|
||||
}
|
||||
|
||||
then:
|
||||
responseCode == 200
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
}
|
||||
span(3) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test failing request"() {
|
||||
when:
|
||||
def httpClient = HttpClient.create()
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}")
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
.status().code()
|
||||
}
|
||||
|
||||
then:
|
||||
def thrownException = thrown(Exception)
|
||||
def connectException = thrownException.getCause()
|
||||
|
||||
and:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
status ERROR
|
||||
errorEvent(connectException.class, connectException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,3 +24,22 @@ dependencies {
|
|||
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:reactor-3.1:javaagent"))
|
||||
}
|
||||
|
||||
tasks {
|
||||
val testConnectionSpan by registering(Test::class) {
|
||||
filter {
|
||||
includeTestsMatching("ReactorNettyConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
include("**/ReactorNettyConnectionSpanTest.*")
|
||||
jvmArgs("-Dotel.instrumentation.reactor-netty.always-create-connect-span=true")
|
||||
}
|
||||
|
||||
named<Test>("test") {
|
||||
dependsOn(testConnectionSpan)
|
||||
filter {
|
||||
excludeTestsMatching("ReactorNettyConnectionSpanTest")
|
||||
isFailOnNoMatchingTests = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettyTracer.tracer;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.net.SocketAddress;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class ConnectionWrapper {
|
||||
|
||||
public static Mono<Channel> wrap(
|
||||
Context context, Context parentContext, SocketAddress remoteAddress, Mono<Channel> mono) {
|
||||
return mono.doOnError(
|
||||
throwable -> {
|
||||
tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable);
|
||||
})
|
||||
.doOnSuccess(
|
||||
channel -> {
|
||||
if (context != null) {
|
||||
tracer().endConnectionSpan(context, parentContext, remoteAddress, channel, null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
|
@ -38,6 +38,6 @@ public class ReactorNettyInstrumentationModule extends InstrumentationModule {
|
|||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new HttpClientInstrumentation());
|
||||
return asList(new HttpClientInstrumentation(), new TransportConnectorInstrumentation());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public class ReactorNettyTracer extends BaseTracer {
|
||||
private static final ReactorNettyTracer TRACER = new ReactorNettyTracer();
|
||||
|
||||
private static final boolean alwaysCreateConnectSpan =
|
||||
Config.get()
|
||||
.getBooleanProperty(
|
||||
"otel.instrumentation.reactor-netty.always-create-connect-span", false);
|
||||
|
||||
protected ReactorNettyTracer() {
|
||||
super(GlobalOpenTelemetry.get());
|
||||
}
|
||||
|
||||
public static ReactorNettyTracer tracer() {
|
||||
return TRACER;
|
||||
}
|
||||
|
||||
public Context startConnectionSpan(Context parentContext, SocketAddress remoteAddress) {
|
||||
if (!alwaysCreateConnectSpan) {
|
||||
return null;
|
||||
}
|
||||
|
||||
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", INTERNAL);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress);
|
||||
|
||||
return parentContext.with(spanBuilder.startSpan());
|
||||
}
|
||||
|
||||
public void endConnectionSpan(
|
||||
Context context,
|
||||
Context parentContext,
|
||||
SocketAddress remoteAddress,
|
||||
Channel channel,
|
||||
Throwable throwable) {
|
||||
if (alwaysCreateConnectSpan) {
|
||||
if (context != null) {
|
||||
// if context is present we started span in startConnectionSpan
|
||||
endConnectionSpan(context, channel, throwable);
|
||||
}
|
||||
} else if (throwable != null && shouldStartSpan(parentContext, CLIENT)) {
|
||||
// if we didn't start span in startConnectionSpan create a span only when the request fails
|
||||
// and when not inside a client span
|
||||
connectionFailure(parentContext, remoteAddress, channel, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
private void endConnectionSpan(Context context, Channel channel, Throwable throwable) {
|
||||
if (channel != null) {
|
||||
Span span = Span.fromContext(context);
|
||||
span.setAttribute(
|
||||
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(span, (InetSocketAddress) channel.remoteAddress());
|
||||
}
|
||||
if (throwable != null) {
|
||||
endExceptionally(context, throwable);
|
||||
} else {
|
||||
end(context);
|
||||
}
|
||||
}
|
||||
|
||||
private void connectionFailure(
|
||||
Context parentContext, SocketAddress remoteAddress, Channel channel, Throwable throwable) {
|
||||
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT);
|
||||
if (channel != null) {
|
||||
spanBuilder.setAttribute(
|
||||
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(
|
||||
spanBuilder, (InetSocketAddress) channel.remoteAddress());
|
||||
} else if (remoteAddress != null) {
|
||||
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress);
|
||||
}
|
||||
|
||||
Context context = withClientSpan(parentContext, spanBuilder.startSpan());
|
||||
endExceptionally(context, throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInstrumentationName() {
|
||||
return "io.opentelemetry.reactor-netty-1.0";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettyTracer.tracer;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import java.net.SocketAddress;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class TransportConnectorInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("reactor.netty.transport.TransportConnector");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("connect").and(takesArgument(1, named("java.net.SocketAddress"))),
|
||||
TransportConnectorInstrumentation.class.getName() + "$ConnectAdvice");
|
||||
}
|
||||
|
||||
public static class ConnectAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void startConnect(
|
||||
@Advice.Argument(1) SocketAddress remoteAddress,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
parentContext = Java8BytecodeBridge.currentContext();
|
||||
context = tracer().startConnectionSpan(parentContext, remoteAddress);
|
||||
if (context != null) {
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void endConnect(
|
||||
@Advice.Thrown Throwable throwable,
|
||||
@Advice.Argument(1) SocketAddress remoteAddress,
|
||||
@Advice.Return(readOnly = false) Mono<Channel> mono,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelParentContext") Context parentContext,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
|
||||
if (throwable != null) {
|
||||
tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable);
|
||||
} else {
|
||||
mono = ConnectionWrapper.wrap(context, parentContext, remoteAddress, mono);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import reactor.netty.http.client.HttpClient
|
||||
import spock.lang.Shared
|
||||
|
||||
class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait {
|
||||
|
||||
@Shared
|
||||
private HttpClientTestServer server
|
||||
|
||||
def setupSpec() {
|
||||
server = new HttpClientTestServer(openTelemetry)
|
||||
server.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
def "test successful request"() {
|
||||
when:
|
||||
def httpClient = HttpClient.create()
|
||||
def responseCode =
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${server.httpPort()}/success")
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
.status().code()
|
||||
}
|
||||
|
||||
then:
|
||||
responseCode == 200
|
||||
assertTraces(1) {
|
||||
trace(0, 4) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "HTTP GET"
|
||||
kind CLIENT
|
||||
childOf(span(0))
|
||||
}
|
||||
span(3) {
|
||||
name "test-http-server"
|
||||
kind SERVER
|
||||
childOf(span(2))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test failing request"() {
|
||||
when:
|
||||
def httpClient = HttpClient.create()
|
||||
runWithSpan("parent") {
|
||||
httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}")
|
||||
.responseSingle { resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
.status().code()
|
||||
}
|
||||
|
||||
then:
|
||||
def thrownException = thrown(Exception)
|
||||
def connectException = thrownException.getCause()
|
||||
|
||||
and:
|
||||
assertTraces(1) {
|
||||
trace(0, 2) {
|
||||
span(0) {
|
||||
name "parent"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
status ERROR
|
||||
errorEvent(thrownException.class, thrownException.message)
|
||||
}
|
||||
span(1) {
|
||||
name "CONNECT"
|
||||
kind INTERNAL
|
||||
childOf(span(0))
|
||||
status ERROR
|
||||
errorEvent(connectException.class, connectException.message)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue