Capture http.route for akka-http (#10039)

Co-authored-by: jason plumb <75337021+breedx-splk@users.noreply.github.com>
This commit is contained in:
Lauri Tulmin 2023-12-12 21:10:17 +02:00 committed by GitHub
parent 49befbd024
commit 14dc34e50e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 482 additions and 44 deletions

View File

@ -24,6 +24,7 @@ import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
import io.opentelemetry.javaagent.instrumentation.akkahttp.server.route.AkkaRouteHolder;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
@ -118,6 +119,7 @@ public class AkkaFlowWrapper
Context parentContext = currentContext();
if (instrumenter().shouldStart(parentContext, request)) {
Context context = instrumenter().start(parentContext, request);
context = AkkaRouteHolder.init(context);
tracingRequest = new TracingRequest(context, request);
}
// event if span wasn't started we need to push TracingRequest to match response

View File

@ -27,6 +27,13 @@ public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
return hasClassesNamed("akka.http.scaladsl.HttpExt");
}
@Override
public boolean isIndyModule() {
// AkkaHttpServerInstrumentationModule and AkkaHttpServerRouteInstrumentationModule share
// AkkaRouteHolder class
return false;
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static java.util.Arrays.asList;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
/**
* This instrumentation applies to classes in akka-http.jar while
* AkkaHttpServerInstrumentationModule applies to classes in akka-http-core.jar
*/
@AutoService(InstrumentationModule.class)
public class AkkaHttpServerRouteInstrumentationModule extends InstrumentationModule {
public AkkaHttpServerRouteInstrumentationModule() {
super("akka-http", "akka-http-10.0", "akka-http-server", "akka-http-server-route");
}
@Override
public boolean isIndyModule() {
// AkkaHttpServerInstrumentationModule and AkkaHttpServerRouteInstrumentationModule share
// AkkaRouteHolder class
return false;
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new PathMatcherInstrumentation(),
new PathMatcherStaticInstrumentation(),
new RouteConcatenationInstrumentation(),
new PathConcatenationInstrumentation());
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static io.opentelemetry.context.ContextKey.named;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource;
import java.util.ArrayDeque;
import java.util.Deque;
public class AkkaRouteHolder implements ImplicitContextKeyed {
private static final ContextKey<AkkaRouteHolder> KEY = named("opentelemetry-akka-route");
private String route = "";
private boolean newSegment;
private boolean endMatched;
private final Deque<String> stack = new ArrayDeque<>();
public static Context init(Context context) {
if (context.get(KEY) != null) {
return context;
}
return context.with(new AkkaRouteHolder());
}
public static void push(String path) {
AkkaRouteHolder holder = Context.current().get(KEY);
if (holder != null && holder.newSegment && !holder.endMatched) {
holder.route += path;
holder.newSegment = false;
}
}
public static void startSegment() {
AkkaRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.newSegment = true;
}
}
public static void endMatched() {
Context context = Context.current();
AkkaRouteHolder holder = context.get(KEY);
if (holder != null) {
holder.endMatched = true;
HttpServerRoute.update(context, HttpServerRouteSource.CONTROLLER, holder.route);
}
}
public static void save() {
AkkaRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.stack.push(holder.route);
holder.newSegment = true;
}
}
public static void restore() {
AkkaRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.route = holder.stack.pop();
holder.newSegment = true;
}
}
@Override
public Context storeInContext(Context context) {
return context.with(KEY, this);
}
private AkkaRouteHolder() {}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class PathConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"akka.http.scaladsl.server.PathMatcher$$anonfun$$tilde$1",
"akka.http.scaladsl.server.PathMatcher");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("apply", "$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
// https://github.com/akka/akka-http/blob/0fedb87671ecc450e7378713105ea1dc1d9d0c7d/akka-http/src/main/scala/akka/http/scaladsl/server/PathMatcher.scala#L43
// https://github.com/akka/akka-http/blob/0fedb87671ecc450e7378713105ea1dc1d9d0c7d/akka-http/src/main/scala/akka/http/scaladsl/server/PathMatcher.scala#L47
// when routing dsl uses path("path1" / "path2") we are concatenating 3 segments "path1" and /
// and "path2" we need to notify the matcher that a new segment has started, so it could be
// captured in the route
AkkaRouteHolder.startSegment();
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.http.scaladsl.model.Uri;
import akka.http.scaladsl.server.PathMatcher;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class PathMatcherInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("akka.http.scaladsl.server.PathMatcher$");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("apply")
.and(takesArgument(0, named("akka.http.scaladsl.model.Uri$Path")))
.and(returns(named("akka.http.scaladsl.server.PathMatcher"))),
this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Uri.Path prefix, @Advice.Return PathMatcher<?> result) {
// store the path being matched inside a VirtualField on the given matcher, so it can be used
// for constructing the route
VirtualField.find(PathMatcher.class, String.class).set(result, prefix.toString());
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.http.scaladsl.model.Uri;
import akka.http.scaladsl.server.PathMatcher;
import akka.http.scaladsl.server.PathMatchers;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class PathMatcherStaticInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return extendsClass(named("akka.http.scaladsl.server.PathMatcher"));
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("apply").and(takesArgument(0, named("akka.http.scaladsl.model.Uri$Path"))),
this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.This PathMatcher<?> pathMatcher,
@Advice.Argument(0) Uri.Path path,
@Advice.Return PathMatcher.Matching<?> result) {
// result is either matched or unmatched, we only care about the matches
if (result.getClass() == PathMatcher.Matched.class) {
if (PathMatchers.PathEnd$.class == pathMatcher.getClass()) {
AkkaRouteHolder.endMatched();
return;
}
// for remember the matched path in PathMatcherInstrumentation, otherwise we just use a *
String prefix = VirtualField.find(PathMatcher.class, String.class).get(pathMatcher);
if (prefix == null) {
if (PathMatchers.Slash$.class == pathMatcher.getClass()) {
prefix = "/";
} else {
prefix = "*";
}
}
if (prefix != null) {
AkkaRouteHolder.push(prefix);
}
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class RouteConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
namedOneOf("apply", "$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
// when routing dsl uses concat(path(...) {...}, path(...) {...}) we'll restore the currently
// matched route after each matcher so that match attempts that failed wouldn't get recorded
// in the route
AkkaRouteHolder.save();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit() {
AkkaRouteHolder.restore();
}
}
}

View File

@ -11,9 +11,9 @@ import io.opentelemetry.instrumentation.testing.junit.http.{
HttpServerTestOptions,
ServerEndpoint
}
import io.opentelemetry.semconv.SemanticAttributes
import java.util
import java.util.Collections
import java.util.function.{Function, Predicate}
abstract class AbstractHttpServerInstrumentationTest
@ -25,8 +25,13 @@ abstract class AbstractHttpServerInstrumentationTest
options.setTestCaptureHttpHeaders(false)
options.setHttpAttributes(
new Function[ServerEndpoint, util.Set[AttributeKey[_]]] {
override def apply(v1: ServerEndpoint): util.Set[AttributeKey[_]] =
Collections.emptySet()
override def apply(v1: ServerEndpoint): util.Set[AttributeKey[_]] = {
val set = new util.HashSet[AttributeKey[_]](
HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES
)
set.remove(SemanticAttributes.HTTP_ROUTE)
set
}
}
)
options.setHasResponseCustomizer(

View File

@ -5,13 +5,26 @@
package io.opentelemetry.javaagent.instrumentation.akkahttp
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS
import io.opentelemetry.instrumentation.testing.junit.http.{
HttpServerInstrumentationExtension,
HttpServerTestOptions
HttpServerTestOptions,
ServerEndpoint
}
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
import io.opentelemetry.testing.internal.armeria.common.{
AggregatedHttpRequest,
HttpMethod
}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.RegisterExtension
import java.util
import java.util.function.{BiFunction, Consumer, Function}
class AkkaHttpServerInstrumentationTest
extends AbstractHttpServerInstrumentationTest {
@RegisterExtension val extension: InstrumentationExtension =
@ -31,5 +44,66 @@ class AkkaHttpServerInstrumentationTest
super.configure(options)
// exception doesn't propagate
options.setTestException(false)
options.setTestPathParam(true)
options.setHttpAttributes(
new Function[ServerEndpoint, util.Set[AttributeKey[_]]] {
override def apply(v1: ServerEndpoint): util.Set[AttributeKey[_]] = {
HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES
}
}
)
val expectedRoute = new BiFunction[ServerEndpoint, String, String] {
def apply(endpoint: ServerEndpoint, method: String): String = {
if (endpoint eq ServerEndpoint.PATH_PARAM)
return "/path/*/param"
expectedHttpRoute(endpoint, method)
}
}
options.setExpectedHttpRoute(expectedRoute)
}
@Test def testPathMatchers(): Unit = {
// /test1 / IntNumber / HexIntNumber / LongNumber / HexLongNumber / DoubleNumber / JavaUUID / Remaining
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address
.resolve(
"/test1/1/a1/2/b2/3.0/e58ed763-928c-4155-bee9-fdbaaadc15f3/remaining"
)
.toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(SUCCESS.getStatus)
assertThat(response.contentUtf8).isEqualTo(SUCCESS.getBody)
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
override def accept(span: SpanDataAssert): Unit = {
span.hasName("GET /test1/*/*/*/*/*/*/*")
}
})
})
}
@Test def testConcat(): Unit = {
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address.resolve("/test2/second").toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(SUCCESS.getStatus)
assertThat(response.contentUtf8).isEqualTo(SUCCESS.getBody)
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
override def accept(span: SpanDataAssert): Unit = {
span.hasName("GET /test2/second")
}
})
})
}
}

View File

@ -8,14 +8,10 @@ package io.opentelemetry.javaagent.instrumentation.akkahttp
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.StatusCodes.Found
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.ExceptionHandler
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.testing.junit.http.{
AbstractHttpServerTest,
ServerEndpoint
}
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
import java.util.function.Supplier
@ -27,43 +23,70 @@ object AkkaHttpTestWebServer {
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val exceptionHandler = ExceptionHandler { case ex: Exception =>
complete(
HttpResponse(status = EXCEPTION.getStatus).withEntity(ex.getMessage)
)
}
val route = handleExceptions(exceptionHandler) {
extractUri { uri =>
val endpoint = ServerEndpoint.forPath(uri.path.toString())
complete {
AbstractHttpServerTest.controller(
endpoint,
new Supplier[HttpResponse] {
def get(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus)
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
case ERROR => resp.withEntity(endpoint.getBody)
case EXCEPTION => throw new Exception(endpoint.getBody)
case _ =>
HttpResponse(status = NOT_FOUND.getStatus)
.withEntity(NOT_FOUND.getBody)
}
var route = get {
concat(
path(SUCCESS.rawPath()) {
complete(
AbstractHttpServerTest.controller(SUCCESS, supplier(SUCCESS.getBody))
)
},
path(INDEXED_CHILD.rawPath()) {
parameterMap { map =>
val supplier = new Supplier[String] {
def get(): String = {
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
map.get(name).orNull
})
""
}
}
complete(AbstractHttpServerTest.controller(INDEXED_CHILD, supplier))
}
},
path(QUERY_PARAM.rawPath()) {
extractUri { uri =>
complete(
AbstractHttpServerTest
.controller(INDEXED_CHILD, supplier(uri.queryString().orNull))
)
}
},
path(REDIRECT.rawPath()) {
redirect(
AbstractHttpServerTest
.controller(REDIRECT, supplier(REDIRECT.getBody)),
Found
)
},
path(ERROR.rawPath()) {
complete(
500 -> AbstractHttpServerTest
.controller(ERROR, supplier(ERROR.getBody))
)
},
path("path" / LongNumber / "param") { id =>
complete(
AbstractHttpServerTest.controller(PATH_PARAM, supplier(id.toString))
)
},
path(
"test1" / IntNumber / HexIntNumber / LongNumber / HexLongNumber /
DoubleNumber / JavaUUID / Remaining
) { (_, _, _, _, _, _, _) =>
complete(SUCCESS.getBody)
},
pathPrefix("test2") {
concat(
path("first") {
complete(SUCCESS.getBody)
},
path("second") {
complete(SUCCESS.getBody)
}
)
}
}
)
}
private var binding: ServerBinding = null
@ -83,4 +106,12 @@ object AkkaHttpTestWebServer {
binding = null
}
}
def supplier(string: String): Supplier[String] = {
new Supplier[String] {
def get(): String = {
string
}
}
}
}

View File

@ -31,7 +31,7 @@ public class IgnoredTypesMatcher extends ElementMatcher.Junction.AbstractBase<Ty
// bytecode proxies typically have $$ in their name
if (name.contains("$$") && !name.contains("$$Lambda$") && !name.endsWith("$$Lambda")) {
// allow scala anonymous classes
return !name.contains("$$anon$");
return !name.contains("$$anon$") && !name.contains("$$anonfun$");
}
if (name.contains("$JaxbAccessor")