feat: instruments finagle's netty-based stack (#10141)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Dan Markwat 2024-02-15 08:39:48 -07:00 committed by GitHub
parent 6ec0d02715
commit 205100e1dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 1341 additions and 91 deletions

View File

@ -59,6 +59,7 @@ These are the supported libraries and frameworks:
| [Elasticsearch API Client](https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/index.html) | 7.16+ | N/A | [Elasticsearch Client Spans] |
| [Elasticsearch REST Client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html) | 5.0+ | N/A | [Database Client Spans] |
| [Elasticsearch Transport Client](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html) | 5.0+ | N/A | [Database Client Spans] |
| [Finagle](https://github.com/twitter/finagle) | 23.11+ | N/A | Provides `http.route` [2] |
| [Finatra](https://github.com/twitter/finatra) | 2.9+ | N/A | Provides `http.route` [2], Controller Spans [3] |
| [Geode Client](https://geode.apache.org/) | 1.4+ | N/A | [Database Client Spans] |
| [Google HTTP Client](https://github.com/googleapis/google-http-java-client) | 1.19+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |

View File

@ -0,0 +1,48 @@
plugins {
id("otel.javaagent-instrumentation")
id("otel.scala-conventions")
}
muzzle {
pass {
group.set("com.twitter")
module.set("finagle-http_2.12")
versions.set("[23.11.0,]")
}
pass {
group.set("com.twitter")
module.set("finagle-http_2.13")
versions.set("[23.11.0,]")
}
}
val finagleVersion = "23.11.0"
val scalaVersion = "2.13.10"
val scalaMinor = Regex("""^([0-9]+\.[0-9]+)\.?.*$""").find(scalaVersion)!!.run {
val (minorVersion) = this.destructured
minorVersion
}
val scalified = fun(pack: String): String {
return "${pack}_$scalaMinor"
}
dependencies {
library("${scalified("com.twitter:finagle-http")}:$finagleVersion")
// should wire netty contexts
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
implementation(project(":instrumentation:netty:netty-4.1:javaagent"))
implementation(project(":instrumentation:netty:netty-4.1:library"))
implementation(project(":instrumentation:netty:netty-4-common:library"))
}
tasks {
test {
jvmArgs("-Dotel.instrumentation.http.client.emit-experimental-telemetry=true")
jvmArgs("-Dotel.instrumentation.http.server.emit-experimental-telemetry=true")
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package com.twitter.finagle;
import com.twitter.finagle.netty4.transport.ChannelTransport;
/** Exposes the finagle-internal {@link ChannelTransport#HandlerName()}. */
public final class ChannelTransportHelpers {
private ChannelTransportHelpers() {}
public static String getHandlerName() {
return ChannelTransport.HandlerName();
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.netty.channel;
/** Exists to correctly expose and propagate the {@link #initChannel(Channel)} calls. */
public abstract class OpenTelemetryChannelInitializerDelegate<T extends Channel>
extends ChannelInitializer<T> {
private final ChannelInitializer<T> initializer;
public OpenTelemetryChannelInitializerDelegate(ChannelInitializer<T> initializer) {
this.initializer = initializer;
}
@Override
protected void initChannel(T t) throws Exception {
initializer.initChannel(t);
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.v23_11;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Option;
public class ChannelTransportInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.twitter.finagle.netty4.transport.ChannelTransport");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("write")),
ChannelTransportInstrumentation.class.getName() + "$WriteAdvice");
}
@SuppressWarnings("unused")
public static class WriteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter(@Advice.Local("otelScope") Scope scope) {
Option<Context> ref = Helpers.CONTEXT_LOCAL.apply();
if (ref.isDefined()) {
scope = ref.get().makeCurrent();
}
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void methodExit(
@Advice.Local("otelScope") Scope scope, @Advice.Thrown Throwable thrown) {
if (scope != null) {
scope.close();
}
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.v23_11;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Arrays;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class FinagleCoreInstrumentationModule extends InstrumentationModule {
public FinagleCoreInstrumentationModule() {
super("finagle-http");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(
new GenStreamingServerDispatcherInstrumentation(),
new ChannelTransportInstrumentation(),
new H2StreamChannelInitInstrumentation());
}
@Override
public boolean isHelperClass(String className) {
return className.equals("com.twitter.finagle.ChannelTransportHelpers")
|| className.equals("io.netty.channel.OpenTelemetryChannelInitializerDelegate");
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.v23_11;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasSuperType;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class GenStreamingServerDispatcherInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return hasSuperType(named("com.twitter.finagle.http.GenStreamingSerialServerDispatcher"));
}
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("com.twitter.finagle.http.GenStreamingSerialServerDispatcher");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(named("loop")),
GenStreamingServerDispatcherInstrumentation.class.getName() + "$LoopAdvice");
}
@SuppressWarnings("unused")
public static class LoopAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter() {
// this works bc at this point in the server evaluation, the netty
// instrumentation has already gone to work and assigned the context to the
// local thread;
//
// this works specifically in finagle's netty stack bc at this point the loop()
// method is running on a netty thread with the necessary access to the
// java-native ThreadLocal where the Context is stored
Helpers.CONTEXT_LOCAL.update(Context.current());
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void methodExit(@Advice.Thrown Throwable thrown) {
// always clear this
Helpers.CONTEXT_LOCAL.clear();
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.v23_11;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class H2StreamChannelInitInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// scala object instance -- append $ to name
return named("com.twitter.finagle.http2.transport.common.H2StreamChannelInit$");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("initServer"))
.and(returns(named("io.netty.channel.ChannelInitializer"))),
H2StreamChannelInitInstrumentation.class.getName() + "$InitServerAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(named("initClient"))
.and(returns(named("io.netty.channel.ChannelInitializer"))),
H2StreamChannelInitInstrumentation.class.getName() + "$InitClientAdvice");
}
@SuppressWarnings("unused")
public static class InitServerAdvice {
@Advice.OnMethodExit
public static void handleExit(
@Advice.Return(readOnly = false) ChannelInitializer<Channel> initializer) {
initializer = Helpers.wrapServer(initializer);
}
}
@SuppressWarnings("unused")
public static class InitClientAdvice {
@Advice.OnMethodExit
public static void handleExit(
@Advice.Return(readOnly = false) ChannelInitializer<Channel> initializer) {
initializer = Helpers.wrapClient(initializer);
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.v23_11;
import static io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler.HTTP_CLIENT_REQUEST;
import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyClientSingletons.clientHandlerFactory;
import com.twitter.util.Local;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.OpenTelemetryChannelInitializerDelegate;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyHttpServerResponseBeforeCommitHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons;
import java.util.Deque;
public final class Helpers {
private Helpers() {}
public static final Local<Context> CONTEXT_LOCAL = new Local<>();
public static <C extends Channel> ChannelInitializer<C> wrapServer(ChannelInitializer<C> inner) {
return new OpenTelemetryChannelInitializerDelegate<C>(inner) {
@Override
protected void initChannel(C channel) throws Exception {
// do all the needful up front, as this may add necessary handlers -- see below
super.initChannel(channel);
// the parent channel is the original http/1.1 channel and has the contexts stored in it;
// we assign to this new channel as the old one will not be evaluated in the upgraded h2c
// chain
Deque<ServerContext> serverContexts =
channel.parent().attr(AttributeKeys.SERVER_CONTEXT).get();
channel.attr(AttributeKeys.SERVER_CONTEXT).set(serverContexts);
// todo add way to propagate the protocol version override up to the netty instrumentation;
// why: the netty instrumentation extracts the http protocol version from the HttpRequest
// object which in this case is _always_ http/1.1 due to the use of this adapter codec,
// Http2StreamFrameToHttpObjectCodec
ChannelHandlerContext codecCtx =
channel.pipeline().context(Http2StreamFrameToHttpObjectCodec.class);
if (codecCtx != null) {
if (channel.pipeline().get(HttpServerTracingHandler.class) == null) {
VirtualField<ChannelHandler, ChannelHandler> virtualField =
VirtualField.find(ChannelHandler.class, ChannelHandler.class);
ChannelHandler ourHandler =
NettyServerSingletons.serverTelemetry()
.createCombinedHandler(NettyHttpServerResponseBeforeCommitHandler.INSTANCE);
channel
.pipeline()
.addAfter(codecCtx.name(), ourHandler.getClass().getName(), ourHandler);
// attach this in this way to match up with how netty instrumentation expects things
virtualField.set(codecCtx.handler(), ourHandler);
}
}
}
};
}
public static <C extends Channel> ChannelInitializer<C> wrapClient(ChannelInitializer<C> inner) {
return new OpenTelemetryChannelInitializerDelegate<C>(inner) {
// wraps everything for roughly the same reasons as in wrapServer(), above
@Override
protected void initChannel(C channel) throws Exception {
super.initChannel(channel);
channel
.attr(AttributeKeys.CLIENT_PARENT_CONTEXT)
.set(channel.parent().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).get());
channel
.attr(AttributeKeys.CLIENT_CONTEXT)
.set(channel.parent().attr(AttributeKeys.CLIENT_CONTEXT).get());
channel.attr(HTTP_CLIENT_REQUEST).set(channel.parent().attr(HTTP_CLIENT_REQUEST).get());
// todo add way to propagate the protocol version override up to the netty instrumentation;
// why: the netty instrumentation extracts the http protocol version from the HttpRequest
// object which in this case is _always_ http/1.1 due to the use of this adapter codec,
// Http2StreamFrameToHttpObjectCodec
ChannelHandlerContext codecCtx =
channel.pipeline().context(Http2StreamFrameToHttpObjectCodec.class);
if (codecCtx != null) {
if (channel.pipeline().get(HttpClientTracingHandler.class) == null) {
VirtualField<ChannelHandler, ChannelHandler> virtualField =
VirtualField.find(ChannelHandler.class, ChannelHandler.class);
ChannelHandler ourHandler = clientHandlerFactory().createCombinedHandler();
channel
.pipeline()
.addAfter(codecCtx.name(), ourHandler.getClass().getName(), ourHandler);
// attach this in this way to match up with how netty instrumentation expects things
virtualField.set(codecCtx.handler(), ourHandler);
}
}
}
};
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.finagle.v23_11;
import static io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS;
import com.google.common.collect.Sets;
import com.twitter.finagle.ListeningServer;
import com.twitter.finagle.Service;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.http.Response;
import com.twitter.finagle.http.Status;
import com.twitter.io.Buf;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.logging.Logging;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions;
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.URI;
import java.util.Collections;
import org.junit.jupiter.api.extension.RegisterExtension;
abstract class AbstractServerTest extends AbstractHttpServerTest<ListeningServer> {
@RegisterExtension
static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent();
@Override
protected void configure(HttpServerTestOptions options) {
super.configure(options);
options.setTestException(false);
options.setHttpAttributes(
unused ->
Sets.difference(
DEFAULT_HTTP_ATTRIBUTES, Collections.singleton(SemanticAttributes.HTTP_ROUTE)));
options.setTestCaptureHttpHeaders(true);
}
@Override
protected void stopServer(ListeningServer server) throws Exception {
Await.ready(server.close(), Duration.fromSeconds(2));
}
static class TestService extends Service<Request, Response> implements Logging {
@Override
public Future<Response> apply(Request request) {
URI uri = URI.create(request.uri());
ServerEndpoint endpoint = ServerEndpoint.forPath(uri.getPath());
return controller(
endpoint,
() -> {
Response response = Response.apply().status(Status.apply(endpoint.getStatus()));
if (SUCCESS.equals(endpoint) || ERROR.equals(endpoint)) {
response.content(Buf.Utf8$.MODULE$.apply(endpoint.getBody()));
} else if (INDEXED_CHILD.equals(endpoint)) {
endpoint.collectSpanAttributes(
name ->
new QueryStringDecoder(uri)
.parameters().get(name).stream().findFirst().orElse(""));
response.content(Buf.Empty());
} else if (QUERY_PARAM.equals(endpoint)) {
response.content(Buf.Utf8$.MODULE$.apply(uri.getQuery()));
} else if (REDIRECT.equals(endpoint)) {
response.content(Buf.Empty());
response.headerMap().put(HttpHeaderNames.LOCATION.toString(), endpoint.getBody());
} else if (CAPTURE_HEADERS.equals(endpoint)) {
response.content(Buf.Utf8$.MODULE$.apply(endpoint.getBody()));
response
.headerMap()
.set("X-Test-Response", request.headerMap().get("X-Test-Request").get());
} else if (EXCEPTION.equals(endpoint)) {
throw new IllegalStateException(endpoint.getBody());
} else {
response.content(Buf.Utf8$.MODULE$.apply(NOT_FOUND.getBody()));
response = Response.apply().status(Status.apply(NOT_FOUND.getStatus()));
}
return Future.value(response);
});
}
}
}

View File

@ -0,0 +1,208 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.finagle.v23_11;
import static io.opentelemetry.javaagent.instrumentation.finagle.v23_11.Utils.createClient;
import static org.assertj.core.api.Assertions.assertThat;
import com.twitter.finagle.ConnectionFailedException;
import com.twitter.finagle.Failure;
import com.twitter.finagle.ReadTimedOutException;
import com.twitter.finagle.Service;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.http.Response;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.FuturePool;
import com.twitter.util.Time;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.timeout.ReadTimeoutException;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.javaagent.instrumentation.finagle.v23_11.Utils.ClientType;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.ConnectException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
/**
* Tests relevant client functionality.
*
* @implNote Why no http/2 tests: finagle maps everything down to http/1.1 via netty's own {@link
* Http2StreamFrameToHttpObjectCodec} which results in the same code path execution through
* finagle's netty stack. While testing would undoubtedly be beneficial, it's at this time
* untested due to lack of concrete support from the otel instrumentation test framework and
* upstream netty instrumentation, both.
*/
// todo implement http/2-specific tests;
// otel test framework doesn't support an http/2 server out of the box
class ClientTest extends AbstractHttpClientTest<Request> {
@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
private final Map<ClientType, Service<Request, Response>> clients = new ConcurrentHashMap<>();
// finagle Services are closeable, but are bound to a host + port;
// as these are only known during the invocation of the test, each test must create and then
// tear down their respective Services.
//
// however, the underlying netty bits are reused between Services by default, so "close"
// works out to a more "virtual" operation than with other client libraries.
@AfterEach
void tearDown() throws Exception {
for (Service<Request, Response> client : clients.values()) {
Await.ready(client.close(Time.fromSeconds(10)));
}
clients.clear();
}
private Service<Request, Response> getClient(URI uri) {
return getClient(uri, uri.getScheme().equals("https") ? ClientType.TLS : ClientType.DEFAULT);
}
private Service<Request, Response> getClient(URI uri, ClientType clientType) {
return clients.computeIfAbsent(
clientType,
(type) -> createClient(type).newService(uri.getHost() + ":" + Utils.safePort(uri)));
}
private Future<Response> doSendRequest(Request request, URI uri) {
// push this onto a FuturePool for 2 reasons:
// 1) forces the request handling onto a different thread, ensuring test accuracy
// 2) using the default thread can mess with high concurrency scenarios
return FuturePool.unboundedPool()
.apply(
() -> {
try {
return Await.result(getClient(uri).apply(request));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
optionsBuilder.setSingleConnectionFactory(
(host, port) -> {
URI uri = URI.create(String.format("http://%s:%d", host, port));
Service<Request, Response> svc = getClient(uri, ClientType.SINGLE_CONN);
return (path, headers) -> {
// this is synchronized bc so is the Netty one;
// seems like the use of a "single" (presumably-queueing) connection would do this
// automatically, but apparently not
synchronized (svc) {
Request get = buildRequest("GET", URI.create(uri + path), headers);
return Await.result(svc.apply(get), Duration.fromSeconds(20)).statusCode();
}
};
});
optionsBuilder.setHttpAttributes(ClientTest::getHttpAttributes);
optionsBuilder.setExpectedClientSpanNameMapper(ClientTest::getExpectedClientSpanName);
optionsBuilder.disableTestRedirects();
optionsBuilder.setClientSpanErrorMapper(
(uri, error) -> {
// all errors should be wrapped in RuntimeExceptions due to how we run things in
// doSendRequest()
AbstractThrowableAssert<?, ?> clientWrapAssert =
assertThat(error).isInstanceOf(RuntimeException.class);
if ("http://localhost:61/".equals(uri.toString())
|| "https://192.0.2.1/".equals(uri.toString())) {
// finagle handles all these in com.twitter.finagle.netty4.ConnectionBuilder.build();
// all errors emitted by the netty Bootstrap.connect() call are mapped to
// twitter/finagle exceptions and handled accordingly;
// namely, this means wrapping the root exception in a finagle
// ConnectionFailedException
// and then with a twitter Failure.rejected() call, resulting in the multiple nestings
// of the root exception
clientWrapAssert
.cause()
.isInstanceOf(Failure.class)
.cause()
.isInstanceOf(ConnectionFailedException.class)
.cause()
.isInstanceOf(ConnectException.class);
error = error.getCause().getCause().getCause();
} else if (uri.getPath().endsWith("/read-timeout")) {
// not a connect() exception like the above, so is not wrapped as above;
clientWrapAssert.cause().isInstanceOf(ReadTimedOutException.class);
// however, this specific case results in a mapping from netty's ReadTimeoutException
// to finagle's ReadTimedOutException in the finagle client code, losing all trace of
// the original exception; so we must construct it manually here
error = new ReadTimeoutException();
}
return error;
});
}
@Override
public Request buildRequest(String method, URI uri, Map<String, String> headers) {
return Utils.buildRequest(method, uri, headers);
}
@Override
public int sendRequest(Request request, String method, URI uri, Map<String, String> headers)
throws Exception {
return Await.result(doSendRequest(request, uri), Duration.fromSeconds(10)).statusCode();
}
@Override
public void sendRequestWithCallback(
Request request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult httpClientResult) {
doSendRequest(request, uri)
.onSuccess(
r -> {
httpClientResult.complete(r.statusCode());
return null;
})
.onFailure(
t -> {
httpClientResult.complete(t);
return null;
});
}
private static Set<AttributeKey<?>> getHttpAttributes(URI uri) {
String uriString = uri.toString();
// http://localhost:61/ => unopened port, https://192.0.2.1/ => non routable address
if ("http://localhost:61/".equals(uriString) || "https://192.0.2.1/".equals(uriString)) {
return Collections.emptySet();
}
Set<AttributeKey<?>> attributes = new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.SERVER_ADDRESS);
attributes.remove(SemanticAttributes.SERVER_PORT);
return attributes;
}
// borrowed from AbstractNetty41ClientTest as finagle's underlying framework under test here is
// netty
private static String getExpectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "https://192.0.2.1/": // non routable address
return "CONNECT";
default:
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
}
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.finagle.v23_11;
import com.twitter.finagle.Http;
import com.twitter.finagle.ListeningServer;
class ServerH1Test extends AbstractServerTest {
@Override
protected ListeningServer setupServer() {
return Http.server()
.withNoHttp2()
.serve(address.getHost() + ":" + port, new AbstractServerTest.TestService());
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.finagle.v23_11;
import static io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent.SWITCHING_PROTOCOLS;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import com.twitter.finagle.Http;
import com.twitter.finagle.ListeningServer;
import com.twitter.finagle.Service;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.http.Response;
import com.twitter.finagle.http2.param.PriorKnowledge;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent;
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
import io.opentelemetry.sdk.testing.assertj.EventDataAssert;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableMap;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
class ServerH2Test extends AbstractServerTest {
@Override
protected ListeningServer setupServer() {
return Http.server()
// when enabled, supports protocol h1 & h2, the latter with upgrade
.withHttp2()
// todo implement http/2-specific tests
// the armeria configuration used at the heart of AbstractHttpServerTest isn't configurable
// to http/2
.configured(PriorKnowledge.apply(true).mk())
.serve(address.getHost() + ":" + port, new AbstractServerTest.TestService());
}
private static void assertSwitchingProtocolsEvent(EventDataAssert eventDataAssert) {
eventDataAssert
.hasName(SWITCHING_PROTOCOLS.eventName())
.hasAttributes(
Attributes.of(
ProtocolSpecificEvent.SWITCHING_PROTOCOLS_FROM_KEY,
"HTTP/1.1",
ProtocolSpecificEvent.SWITCHING_PROTOCOLS_TO_KEY,
Collections.singletonList("h2c")));
}
@Test
void h2ProtocolUpgrade() throws Exception {
URI uri = URI.create("http://localhost:" + port + SUCCESS.getPath());
Service<Request, Response> client =
Utils.createClient(Utils.ClientType.DEFAULT)
// must use http2 here
.withHttp2()
.newService(uri.getHost() + ":" + uri.getPort());
Response response =
Await.result(
client.apply(
Utils.buildRequest(
"GET",
uri,
ImmutableMap.of(
HttpHeaderNames.USER_AGENT.toString(),
TEST_USER_AGENT,
HttpHeaderNames.X_FORWARDED_FOR.toString(),
TEST_CLIENT_IP))),
com.twitter.util.Duration.fromSeconds(20));
Await.result(client.close(), Duration.fromSeconds(5));
assertThat(response.status().code()).isEqualTo(SUCCESS.getStatus());
assertThat(response.contentString()).isEqualTo(SUCCESS.getBody());
String method = "GET";
ServerEndpoint endpoint = SUCCESS;
testing.waitAndAssertTraces(
trace -> {
List<Consumer<SpanDataAssert>> spanAssertions = new ArrayList<>();
spanAssertions.add(
s -> s.hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent));
spanAssertions.add(
span -> {
assertServerSpan(span, method, endpoint, endpoint.getStatus());
span.hasEventsSatisfyingExactly(ServerH2Test::assertSwitchingProtocolsEvent);
});
int parentIndex = 1;
spanAssertions.add(
span -> {
assertControllerSpan(span, null);
span.hasParent(trace.getSpan(parentIndex));
});
trace.hasSpansSatisfyingExactly(spanAssertions);
});
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.finagle.v23_11;
import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.CONNECTION_TIMEOUT;
import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest.READ_TIMEOUT;
import com.twitter.finagle.Http;
import com.twitter.finagle.http.Method;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.service.RetryBudget;
import com.twitter.util.Duration;
import java.net.URI;
import java.util.Locale;
import java.util.Map;
final class Utils {
private Utils() {}
static Http.Client createClient(ClientType clientType) {
Http.Client client =
Http.client()
.withNoHttp2()
.withTransport()
.readTimeout(Duration.fromMilliseconds(READ_TIMEOUT.toMillis()))
.withTransport()
.connectTimeout(Duration.fromMilliseconds(CONNECTION_TIMEOUT.toMillis()))
// disable automatic retries -- retries will result in under-counting traces in the
// tests
.withRetryBudget(RetryBudget.Empty());
switch (clientType) {
case TLS:
client = client.withTransport().tlsWithoutValidation();
break;
case SINGLE_CONN:
client = client.withSessionPool().maxSize(1);
break;
case DEFAULT:
break;
}
return client;
}
enum ClientType {
TLS,
SINGLE_CONN,
DEFAULT;
}
static int safePort(URI uri) {
int port = uri.getPort();
if (port == -1) {
port = uri.getScheme().equals("https") ? 443 : 80;
}
return port;
}
static Request buildRequest(String method, URI uri, Map<String, String> headers) {
Request request =
Request.apply(
Method.apply(method.toUpperCase(Locale.ENGLISH)),
uri.getPath() + (uri.getQuery() == null ? "" : "?" + uri.getRawQuery()));
request.host(uri.getHost() + ":" + safePort(uri));
headers.forEach((key, value) -> request.headerMap().put(key, value));
return request;
}
}

View File

@ -5,7 +5,9 @@
package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyClientSingletons.clientHandlerFactory;
import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyClientSingletons.sslInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons.serverTelemetry;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
@ -22,12 +24,6 @@ import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettySslInstrumentationHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientResponseTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerResponseTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler;
import io.opentelemetry.javaagent.bootstrap.CallDepth;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.netty.v4.common.AbstractNettyChannelPipelineInstrumentation;
@ -105,23 +101,21 @@ public class NettyChannelPipelineInstrumentation
// Server pipeline handlers
if (handler instanceof HttpServerCodec) {
ourHandler =
new HttpServerTracingHandler(
NettyServerSingletons.instrumenter(),
NettyHttpServerResponseBeforeCommitHandler.INSTANCE);
serverTelemetry()
.createCombinedHandler(NettyHttpServerResponseBeforeCommitHandler.INSTANCE);
} else if (handler instanceof HttpRequestDecoder) {
ourHandler = new HttpServerRequestTracingHandler(NettyServerSingletons.instrumenter());
ourHandler = serverTelemetry().createRequestHandler();
} else if (handler instanceof HttpResponseEncoder) {
ourHandler =
new HttpServerResponseTracingHandler(
NettyServerSingletons.instrumenter(),
NettyHttpServerResponseBeforeCommitHandler.INSTANCE);
serverTelemetry()
.createCombinedHandler(NettyHttpServerResponseBeforeCommitHandler.INSTANCE);
// Client pipeline handlers
} else if (handler instanceof HttpClientCodec) {
ourHandler = new HttpClientTracingHandler(NettyClientSingletons.instrumenter());
ourHandler = clientHandlerFactory().createCombinedHandler();
} else if (handler instanceof HttpRequestEncoder) {
ourHandler = new HttpClientRequestTracingHandler(NettyClientSingletons.instrumenter());
ourHandler = clientHandlerFactory().createRequestHandler();
} else if (handler instanceof HttpResponseDecoder) {
ourHandler = new HttpClientResponseTracingHandler(NettyClientSingletons.instrumenter());
ourHandler = clientHandlerFactory().createResponseHandler();
// the SslHandler lives in the netty-handler module, using class name comparison to avoid
// adding a dependency
} else if (handler.getClass().getName().equals("io.netty.handler.ssl.SslHandler")) {

View File

@ -14,6 +14,7 @@ import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyClientInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettyConnectionInstrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.internal.client.NettySslInstrumenter;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.NettyClientHandlerFactory;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.Collections;
@ -30,6 +31,7 @@ public final class NettyClientSingletons {
private static final Instrumenter<HttpRequestAndChannel, HttpResponse> INSTRUMENTER;
private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER;
private static final NettySslInstrumenter SSL_INSTRUMENTER;
private static final NettyClientHandlerFactory CLIENT_HANDLER_FACTORY;
static {
NettyClientInstrumenterFactory factory =
@ -51,6 +53,9 @@ public final class NettyClientSingletons {
Collections.emptyList());
CONNECTION_INSTRUMENTER = factory.createConnectionInstrumenter();
SSL_INSTRUMENTER = factory.createSslInstrumenter();
CLIENT_HANDLER_FACTORY =
new NettyClientHandlerFactory(
INSTRUMENTER, CommonConfig.get().shouldEmitExperimentalHttpClientTelemetry());
}
public static Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter() {
@ -65,5 +70,9 @@ public final class NettyClientSingletons {
return SSL_INSTRUMENTER;
}
public static NettyClientHandlerFactory clientHandlerFactory() {
return CLIENT_HANDLER_FACTORY;
}
private NettyClientSingletons() {}
}

View File

@ -41,6 +41,7 @@ public class NettyInstrumentationModule extends InstrumentationModule {
new BootstrapInstrumentation(),
new NettyFutureInstrumentation(),
new NettyChannelPipelineInstrumentation(),
new AbstractChannelHandlerContextInstrumentation());
new AbstractChannelHandlerContextInstrumentation(),
new SingleThreadEventExecutorInstrumentation());
}
}

View File

@ -5,30 +5,29 @@
package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4.common.internal.server.NettyServerInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4_1.NettyServerTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
public final class NettyServerSingletons {
private static final Instrumenter<HttpRequestAndChannel, HttpResponse> INSTRUMENTER =
NettyServerInstrumenterFactory.create(
GlobalOpenTelemetry.get(),
"io.opentelemetry.netty-4.1",
builder ->
builder
.setCapturedRequestHeaders(CommonConfig.get().getServerRequestHeaders())
.setCapturedResponseHeaders(CommonConfig.get().getServerResponseHeaders())
.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods()),
builder -> builder.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods()),
builder -> builder.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods()),
CommonConfig.get().shouldEmitExperimentalHttpServerTelemetry());
static {
SERVER_TELEMETRY =
NettyServerTelemetry.builder(GlobalOpenTelemetry.get())
.setEmitExperimentalHttpServerEvents(
CommonConfig.get().shouldEmitExperimentalHttpServerTelemetry())
.setEmitExperimentalHttpServerMetrics(
CommonConfig.get().shouldEmitExperimentalHttpServerTelemetry())
.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods())
.setCapturedRequestHeaders(CommonConfig.get().getServerRequestHeaders())
.setCapturedResponseHeaders(CommonConfig.get().getServerResponseHeaders())
.build();
}
public static Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter() {
return INSTRUMENTER;
private static final NettyServerTelemetry SERVER_TELEMETRY;
public static NettyServerTelemetry serverTelemetry() {
return SERVER_TELEMETRY;
}
private NettyServerSingletons() {}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v4_1;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class SingleThreadEventExecutorInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.netty.util.concurrent.SingleThreadEventExecutor");
}
@Override
public void transform(TypeTransformer transformer) {
// this method submits a task that runs for forever to an executor, propagating context there
// would result in a context leak
transformer.applyAdviceToMethod(
named("startThread"), this.getClass().getName() + "$DisablePropagationAdvice");
}
@SuppressWarnings("unused")
public static class DisablePropagationAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter() {
if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
// Prevent context from leaking by running this method under root context.
// Root context is not propagated by executor instrumentation.
return Java8BytecodeBridge.rootContext().makeCurrent();
}
return null;
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Enter Scope scope) {
if (scope != null) {
scope.close();
}
}
}
}

View File

@ -15,17 +15,18 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientRequestTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientResponseTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.NettyClientHandlerFactory;
/** Entrypoint for instrumenting Netty HTTP clients. */
public final class NettyClientTelemetry {
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
private final NettyClientHandlerFactory handlerFactory;
NettyClientTelemetry(Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
this.instrumenter = instrumenter;
NettyClientTelemetry(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
boolean emitExperimentalHttpClientEvents) {
this.handlerFactory =
new NettyClientHandlerFactory(instrumenter, emitExperimentalHttpClientEvents);
}
/** Returns a new {@link NettyClientTelemetry} configured with the given {@link OpenTelemetry}. */
@ -42,11 +43,11 @@ public final class NettyClientTelemetry {
}
/**
* /** Returns a new {@link ChannelOutboundHandlerAdapter} that generates telemetry for outgoing
* HTTP requests. Must be paired with {@link #createResponseHandler()}.
* Returns a new {@link ChannelOutboundHandlerAdapter} that generates telemetry for outgoing HTTP
* requests. Must be paired with {@link #createResponseHandler()}.
*/
public ChannelOutboundHandlerAdapter createRequestHandler() {
return new HttpClientRequestTracingHandler(instrumenter);
return handlerFactory.createRequestHandler();
}
/**
@ -54,7 +55,7 @@ public final class NettyClientTelemetry {
* responses. Must be paired with {@link #createRequestHandler()}.
*/
public ChannelInboundHandlerAdapter createResponseHandler() {
return new HttpClientResponseTracingHandler(instrumenter);
return handlerFactory.createResponseHandler();
}
/**
@ -64,7 +65,7 @@ public final class NettyClientTelemetry {
public CombinedChannelDuplexHandler<
? extends ChannelInboundHandlerAdapter, ? extends ChannelOutboundHandlerAdapter>
createCombinedHandler() {
return new HttpClientTracingHandler(instrumenter);
return handlerFactory.createCombinedHandler();
}
/**

View File

@ -33,11 +33,19 @@ public final class NettyClientTelemetryBuilder {
private Consumer<HttpSpanNameExtractorBuilder<HttpRequestAndChannel>>
spanNameExtractorConfigurer = builder -> {};
private boolean emitExperimentalHttpClientMetrics = false;
private boolean emitExperimentalHttpClientEvents = false;
NettyClientTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
@CanIgnoreReturnValue
public NettyClientTelemetryBuilder setEmitExperimentalHttpClientEvents(
boolean emitExperimentalHttpClientEvents) {
this.emitExperimentalHttpClientEvents = emitExperimentalHttpClientEvents;
return this;
}
/**
* Configures the HTTP request headers that will be captured as span attributes.
*
@ -123,6 +131,7 @@ public final class NettyClientTelemetryBuilder {
PeerServiceResolver.create(Collections.emptyMap()),
emitExperimentalHttpClientMetrics)
.createHttpInstrumenter(
extractorConfigurer, spanNameExtractorConfigurer, additionalAttributesExtractors));
extractorConfigurer, spanNameExtractorConfigurer, additionalAttributesExtractors),
emitExperimentalHttpClientEvents);
}
}

View File

@ -12,6 +12,7 @@ import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerRequestTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerResponseBeforeCommitHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerResponseTracingHandler;
@ -21,9 +22,13 @@ import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTra
public final class NettyServerTelemetry {
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
private final ProtocolEventHandler protocolEventHandler;
NettyServerTelemetry(Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
NettyServerTelemetry(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
ProtocolEventHandler protocolEventHandler) {
this.instrumenter = instrumenter;
this.protocolEventHandler = protocolEventHandler;
}
/** Returns a new {@link NettyServerTelemetry} configured with the given {@link OpenTelemetry}. */
@ -52,8 +57,12 @@ public final class NettyServerTelemetry {
* responses. Must be paired with {@link #createRequestHandler()}.
*/
public ChannelOutboundHandlerAdapter createResponseHandler() {
return new HttpServerResponseTracingHandler(
instrumenter, HttpServerResponseBeforeCommitHandler.Noop.INSTANCE);
return createResponseHandler(HttpServerResponseBeforeCommitHandler.Noop.INSTANCE);
}
public ChannelOutboundHandlerAdapter createResponseHandler(
HttpServerResponseBeforeCommitHandler commitHandler) {
return new HttpServerResponseTracingHandler(instrumenter, commitHandler, protocolEventHandler);
}
/**
@ -63,7 +72,12 @@ public final class NettyServerTelemetry {
public CombinedChannelDuplexHandler<
? extends ChannelInboundHandlerAdapter, ? extends ChannelOutboundHandlerAdapter>
createCombinedHandler() {
return new HttpServerTracingHandler(
instrumenter, HttpServerResponseBeforeCommitHandler.Noop.INSTANCE);
return createCombinedHandler(HttpServerResponseBeforeCommitHandler.Noop.INSTANCE);
}
public CombinedChannelDuplexHandler<
? extends ChannelInboundHandlerAdapter, ? extends ChannelOutboundHandlerAdapter>
createCombinedHandler(HttpServerResponseBeforeCommitHandler commitHandler) {
return new HttpServerTracingHandler(instrumenter, commitHandler, protocolEventHandler);
}
}

View File

@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteBuilder;
import io.opentelemetry.instrumentation.api.semconv.http.HttpSpanNameExtractorBuilder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4.common.internal.server.NettyServerInstrumenterFactory;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
@ -29,11 +30,24 @@ public final class NettyServerTelemetryBuilder {
private Consumer<HttpServerRouteBuilder<HttpRequestAndChannel>> httpServerRouteConfigurer =
builder -> {};
private boolean emitExperimentalHttpServerMetrics = false;
private boolean emitExperimentalHttpServerEvents = false;
NettyServerTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
/**
* Configures emission of experimental events.
*
* @param emitExperimentalHttpServerEvents set to true to emit events
*/
@CanIgnoreReturnValue
public NettyServerTelemetryBuilder setEmitExperimentalHttpServerEvents(
boolean emitExperimentalHttpServerEvents) {
this.emitExperimentalHttpServerEvents = emitExperimentalHttpServerEvents;
return this;
}
/**
* Configures the HTTP request headers that will be captured as span attributes.
*
@ -108,6 +122,9 @@ public final class NettyServerTelemetryBuilder {
extractorConfigurer,
spanNameExtractorConfigurer,
httpServerRouteConfigurer,
emitExperimentalHttpServerMetrics));
emitExperimentalHttpServerMetrics),
emitExperimentalHttpServerEvents
? ProtocolEventHandler.Enabled.INSTANCE
: ProtocolEventHandler.Noop.INSTANCE);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4_1.internal;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.context.Context;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public interface ProtocolEventHandler {
void handle(
ProtocolSpecificEvent event, Context context, HttpRequest request, HttpResponse response);
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
enum Noop implements ProtocolEventHandler {
INSTANCE;
@Override
public void handle(
ProtocolSpecificEvent event, Context context, HttpRequest request, HttpResponse response) {}
}
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
enum Enabled implements ProtocolEventHandler {
INSTANCE;
@Override
public void handle(
ProtocolSpecificEvent event, Context context, HttpRequest request, HttpResponse response) {
event.addEvent(context, request, response);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4_1.internal;
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import java.util.List;
import javax.annotation.Nullable;
/**
* Adds events to {@link Span}s for the enumerated protocols and situations
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public enum ProtocolSpecificEvent {
/**
* The event after which point the server or client transmits or receives, respectively, in one of
* the signified upgraded protocols, per <a
* href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism">protocol
* upgrade mechanism</a>.
*/
SWITCHING_PROTOCOLS("http.response.status_code.101.upgrade") {
@Override
void addEvent(Context context, HttpRequest request, HttpResponse response) {
Span.fromContext(context)
.addEvent(
eventName(),
Attributes.of(
SWITCHING_PROTOCOLS_FROM_KEY,
request != null ? request.protocolVersion().text() : "unknown",
// pulls out all possible values emitted by upgrade header, per:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Upgrade
SWITCHING_PROTOCOLS_TO_KEY,
response.headers().getAll("upgrade")));
}
};
public static final AttributeKey<String> SWITCHING_PROTOCOLS_FROM_KEY =
stringKey("network.protocol.from");
public static final AttributeKey<List<String>> SWITCHING_PROTOCOLS_TO_KEY =
stringArrayKey("network.protocol.to");
private final String eventName;
ProtocolSpecificEvent(String eventName) {
this.eventName = eventName;
}
public String eventName() {
return eventName;
}
abstract void addEvent(
Context context, @Nullable HttpRequest request, @Nullable HttpResponse response);
}

View File

@ -11,6 +11,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
@ -19,6 +20,8 @@ import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
@ -30,10 +33,13 @@ public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapt
AttributeKey.valueOf(HttpClientResponseTracingHandler.class, "http-client-response");
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
private final ProtocolEventHandler protocolEventHandler;
public HttpClientResponseTracingHandler(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
ProtocolEventHandler protocolEventHandler) {
this.instrumenter = instrumenter;
this.protocolEventHandler = protocolEventHandler;
}
@Override
@ -49,21 +55,48 @@ public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapt
Context parentContext = parentContextAttr.get();
if (msg instanceof FullHttpResponse) {
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).getAndSet(null);
instrumenter.end(context, request, (HttpResponse) msg, null);
contextAttr.set(null);
parentContextAttr.set(null);
FullHttpResponse response = (FullHttpResponse) msg;
if (response.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).get();
protocolEventHandler.handle(
ProtocolSpecificEvent.SWITCHING_PROTOCOLS,
context,
request != null ? request.request() : null,
response);
} else {
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).getAndSet(null);
instrumenter.end(context, request, (HttpResponse) msg, null);
contextAttr.set(null);
parentContextAttr.set(null);
}
} else if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
if (response.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).get();
protocolEventHandler.handle(
ProtocolSpecificEvent.SWITCHING_PROTOCOLS,
context,
request != null ? request.request() : null,
response);
}
// HTTP 101 proto switch note: netty sends EmptyLastHttpContent upon proto upgrade;
// setting this here ensures we can see in the next if-block (LastHttpContent) whether
// the latest http status was indeed 101 or something else.
// Headers before body have been received, store them to use when finishing the span.
ctx.channel().attr(HTTP_CLIENT_RESPONSE).set((HttpResponse) msg);
} else if (msg instanceof LastHttpContent) {
// Not a FullHttpResponse so this is content that has been received after headers.
// Finish the span using what we stored in attrs.
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).getAndSet(null);
HttpResponse response = ctx.channel().attr(HTTP_CLIENT_RESPONSE).getAndSet(null);
instrumenter.end(context, request, response, null);
contextAttr.set(null);
parentContextAttr.set(null);
HttpResponse responseTest = ctx.channel().attr(HTTP_CLIENT_RESPONSE).get();
if (responseTest == null
|| !responseTest.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
// Not a FullHttpResponse so this is content that has been received after headers.
// Finish the span using what we stored in attrs.
HttpRequestAndChannel request = ctx.channel().attr(HTTP_CLIENT_REQUEST).getAndSet(null);
HttpResponse response = ctx.channel().attr(HTTP_CLIENT_RESPONSE).getAndSet(null);
instrumenter.end(context, request, response, null);
contextAttr.set(null);
parentContextAttr.set(null);
}
}
// We want the callback in the scope of the parent, not the client span

View File

@ -17,6 +17,7 @@ import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
@ -27,9 +28,11 @@ public class HttpClientTracingHandler
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
public HttpClientTracingHandler(Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter) {
public HttpClientTracingHandler(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
ProtocolEventHandler protocolEventHandler) {
super(
new HttpClientResponseTracingHandler(instrumenter),
new HttpClientResponseTracingHandler(instrumenter, protocolEventHandler),
new HttpClientRequestTracingHandler(instrumenter));
this.instrumenter = instrumenter;
}

View File

@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4_1.internal.client;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public class NettyClientHandlerFactory {
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
private final ProtocolEventHandler protocolEventHandler;
public NettyClientHandlerFactory(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
boolean emitExperimentalHttpClientEvents) {
this.instrumenter = instrumenter;
this.protocolEventHandler =
emitExperimentalHttpClientEvents
? ProtocolEventHandler.Enabled.INSTANCE
: ProtocolEventHandler.Noop.INSTANCE;
}
/**
* Returns a new {@link ChannelOutboundHandlerAdapter} that generates telemetry for outgoing HTTP
* requests. Must be paired with {@link #createResponseHandler()}.
*/
public ChannelOutboundHandlerAdapter createRequestHandler() {
return new HttpClientRequestTracingHandler(instrumenter);
}
/**
* Returns a new {@link ChannelInboundHandlerAdapter} that generates telemetry for incoming HTTP
* responses. Must be paired with {@link #createRequestHandler()}.
*/
public ChannelInboundHandlerAdapter createResponseHandler() {
return new HttpClientResponseTracingHandler(instrumenter, protocolEventHandler);
}
/**
* Returns a new {@link CombinedChannelDuplexHandler} that generates telemetry for outgoing HTTP
* requests and incoming responses in a single handler.
*/
public CombinedChannelDuplexHandler<
? extends ChannelInboundHandlerAdapter, ? extends ChannelOutboundHandlerAdapter>
createCombinedHandler() {
return new HttpClientTracingHandler(instrumenter, protocolEventHandler);
}
}

View File

@ -11,6 +11,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
@ -20,6 +21,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import java.util.Deque;
import javax.annotation.Nullable;
@ -35,12 +38,15 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
private final Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter;
private final HttpServerResponseBeforeCommitHandler beforeCommitHandler;
private final ProtocolEventHandler eventHandler;
public HttpServerResponseTracingHandler(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
HttpServerResponseBeforeCommitHandler beforeCommitHandler) {
HttpServerResponseBeforeCommitHandler beforeCommitHandler,
ProtocolEventHandler eventHandler) {
this.instrumenter = instrumenter;
this.beforeCommitHandler = beforeCommitHandler;
this.eventHandler = eventHandler;
}
@Override
@ -50,6 +56,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
Deque<ServerContext> serverContexts = serverContextAttr.get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
if (serverContext == null) {
super.write(ctx, msg, prm);
return;
@ -69,32 +76,54 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
// Going to finish the span after the write of the last content finishes.
if (msg instanceof FullHttpResponse) {
// Headers and body all sent together, we have the response information in the msg.
beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg);
serverContexts.removeFirst();
writePromise.addListener(
future ->
end(
serverContext.context(),
serverContext.request(),
(FullHttpResponse) msg,
writePromise));
FullHttpResponse response = (FullHttpResponse) msg;
if (response.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
eventHandler.handle(
ProtocolSpecificEvent.SWITCHING_PROTOCOLS,
serverContext.context(),
serverContext.request().request(),
response);
} else {
// Headers and body all sent together, we have the response information in the msg.
beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg);
serverContexts.removeFirst();
writePromise.addListener(
future ->
end(
serverContext.context(),
serverContext.request(),
(FullHttpResponse) msg,
writePromise));
}
} else {
// Body sent after headers. We stored the response information in the context when
// encountering HttpResponse (which was not FullHttpResponse since it's not
// LastHttpContent).
serverContexts.removeFirst();
HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null);
writePromise.addListener(
future ->
end(serverContext.context(), serverContext.request(), response, writePromise));
HttpResponse responseTest = ctx.channel().attr(HTTP_SERVER_RESPONSE).get();
if (responseTest == null
|| !responseTest.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
// Body sent after headers. We stored the response information in the context when
// encountering HttpResponse (which was not FullHttpResponse since it's not
// LastHttpContent).
serverContexts.removeFirst();
HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null);
writePromise.addListener(
future ->
end(serverContext.context(), serverContext.request(), response, writePromise));
}
}
} else {
writePromise = prm;
if (msg instanceof HttpResponse) {
// Headers before body has been sent, store them to use when finishing the span.
beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg);
ctx.channel().attr(HTTP_SERVER_RESPONSE).set((HttpResponse) msg);
HttpResponse response = (HttpResponse) msg;
if (response.status().equals(HttpResponseStatus.SWITCHING_PROTOCOLS)) {
eventHandler.handle(
ProtocolSpecificEvent.SWITCHING_PROTOCOLS,
serverContext.context(),
serverContext.request().request(),
response);
} else {
// Headers before body has been sent, store them to use when finishing the span.
beforeCommitHandler.handle(serverContext.context(), response);
ctx.channel().attr(HTTP_SERVER_RESPONSE).set(response);
}
}
}

View File

@ -9,6 +9,7 @@ import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
@ -20,9 +21,11 @@ public class HttpServerTracingHandler
public HttpServerTracingHandler(
Instrumenter<HttpRequestAndChannel, HttpResponse> instrumenter,
HttpServerResponseBeforeCommitHandler responseBeforeCommitHandler) {
HttpServerResponseBeforeCommitHandler responseBeforeCommitHandler,
ProtocolEventHandler protocolEventHandler) {
super(
new HttpServerRequestTracingHandler(instrumenter),
new HttpServerResponseTracingHandler(instrumenter, responseBeforeCommitHandler));
new HttpServerResponseTracingHandler(
instrumenter, responseBeforeCommitHandler, protocolEventHandler));
}
}

View File

@ -240,6 +240,7 @@ include(":instrumentation:executors:javaagent")
include(":instrumentation:executors:testing")
include(":instrumentation:external-annotations:javaagent")
include(":instrumentation:external-annotations:javaagent-unit-tests")
include(":instrumentation:finagle-http-23.11:javaagent")
include(":instrumentation:finatra-2.9:javaagent")
include(":instrumentation:geode-1.4:javaagent")
include(":instrumentation:google-http-client-1.19:javaagent")