Add tapir path matching within pekko instrumentation (#13386)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Mason Lazalier Edmison 2025-03-07 17:18:14 -06:00 committed by GitHub
parent 0970da7489
commit ffeb80eb26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 334 additions and 27 deletions

View File

@ -10,6 +10,7 @@ muzzle {
versions.set("[1.0,)") versions.set("[1.0,)")
assertInverse.set(true) assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_2.12:1.0.1") extraDependency("org.apache.pekko:pekko-stream_2.12:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
} }
pass { pass {
group.set("org.apache.pekko") group.set("org.apache.pekko")
@ -17,6 +18,7 @@ muzzle {
versions.set("[1.0,)") versions.set("[1.0,)")
assertInverse.set(true) assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_2.13:1.0.1") extraDependency("org.apache.pekko:pekko-stream_2.13:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
} }
pass { pass {
group.set("org.apache.pekko") group.set("org.apache.pekko")
@ -24,21 +26,63 @@ muzzle {
versions.set("[1.0,)") versions.set("[1.0,)")
assertInverse.set(true) assertInverse.set(true)
extraDependency("org.apache.pekko:pekko-stream_3:1.0.1") extraDependency("org.apache.pekko:pekko-stream_3:1.0.1")
excludeInstrumentationName("tapir-pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_2.12")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_2.13")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
}
pass {
group.set("com.softwaremill.sttp.tapir")
module.set("tapir-pekko-http-server_3")
versions.set("[1.7,)")
assertInverse.set(true)
excludeInstrumentationName("pekko-http-server")
} }
} }
dependencies { dependencies {
library("org.apache.pekko:pekko-http_2.12:1.0.0") library("org.apache.pekko:pekko-http_2.12:1.0.0")
library("org.apache.pekko:pekko-stream_2.12:1.0.1") library("org.apache.pekko:pekko-stream_2.12:1.0.1")
compileOnly("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")
testImplementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")
testInstrumentation(project(":instrumentation:pekko:pekko-actor-1.0:javaagent")) testInstrumentation(project(":instrumentation:pekko:pekko-actor-1.0:javaagent"))
testInstrumentation(project(":instrumentation:executors:javaagent")) testInstrumentation(project(":instrumentation:executors:javaagent"))
latestDepTestLibrary("org.apache.pekko:pekko-http_2.13:latest.release") latestDepTestLibrary("org.apache.pekko:pekko-http_2.13:latest.release")
latestDepTestLibrary("org.apache.pekko:pekko-stream_2.13:latest.release") latestDepTestLibrary("org.apache.pekko:pekko-stream_2.13:latest.release")
latestDepTestLibrary("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.13:latest.release") }
testing {
suites {
val tapirTest by registering(JvmTestSuite::class) {
dependencies {
// this only exists to make Intellij happy since it doesn't (currently at least) understand our
// inclusion of this artifact inside :testing-common
compileOnly(project.dependencies.project(":testing:armeria-shaded-for-testing", configuration = "shadow"))
if (findProperty("testLatestDeps") as Boolean) {
implementation("com.typesafe.akka:akka-http_2.13:latest.release")
implementation("com.typesafe.akka:akka-stream_2.13:latest.release")
implementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.13:latest.release")
} else {
implementation("org.apache.pekko:pekko-http_2.12:1.0.0")
implementation("org.apache.pekko:pekko-stream_2.12:1.0.1")
implementation("com.softwaremill.sttp.tapir:tapir-pekko-http-server_2.12:1.7.0")
}
}
}
}
} }
tasks { tasks {
@ -51,6 +95,10 @@ tasks {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
} }
check {
dependsOn(testing.suites)
}
} }
if (findProperty("testLatestDeps") as Boolean) { if (findProperty("testLatestDeps") as Boolean) {
@ -59,7 +107,6 @@ if (findProperty("testLatestDeps") as Boolean) {
testImplementation { testImplementation {
exclude("org.apache.pekko", "pekko-http_2.12") exclude("org.apache.pekko", "pekko-http_2.12")
exclude("org.apache.pekko", "pekko-stream_2.12") exclude("org.apache.pekko", "pekko-stream_2.12")
exclude("com.softwaremill.sttp.tapir", "tapir-pekko-http-server_2.12")
} }
} }
} }

View File

@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.Function2;
import scala.Option;
import scala.PartialFunction;
import scala.Unit;
import scala.concurrent.Future;
import scala.util.Try;
import sttp.tapir.EndpointInput;
import sttp.tapir.server.ServerEndpoint;
public class RouteWrapper implements Function1<RequestContext, Future<RouteResult>> {
private final Function1<RequestContext, Future<RouteResult>> route;
private final ServerEndpoint<?, ?> serverEndpoint;
public RouteWrapper(
ServerEndpoint<?, ?> serverEndpoint, Function1<RequestContext, Future<RouteResult>> route) {
this.route = route;
this.serverEndpoint = serverEndpoint;
}
public class Finalizer implements PartialFunction<Try<RouteResult>, Unit> {
@Override
public boolean isDefinedAt(Try<RouteResult> tryResult) {
return tryResult.isSuccess();
}
@Override
public Unit apply(Try<RouteResult> tryResult) {
if (tryResult.isSuccess()) {
RouteResult result = tryResult.get();
if (result.getClass() == RouteResult.Complete.class) {
String path =
serverEndpoint.showPathTemplate(
(index, pc) ->
pc.name().isDefined() ? "{" + pc.name().get() + "}" : "{param" + index + "}",
Option.apply(
(Function2<Object, EndpointInput.Query<?>, String>)
(index, q) -> q.name() + "={" + q.name() + "}"),
false,
"*",
Option.apply("*"),
Option.apply("*"));
PekkoRouteHolder.push(path);
PekkoRouteHolder.endMatched();
}
}
return null;
}
}
@Override
public Future<RouteResult> apply(RequestContext ctx) {
return route.apply(ctx).andThen(new Finalizer(), ctx.executionContext());
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.concurrent.Future;
import sttp.tapir.server.ServerEndpoint;
public class TapirPathInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("toRoute").and(takesArgument(0, named("sttp.tapir.server.ServerEndpoint"))),
this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ServerEndpoint<?, ?> endpoint,
@Advice.Return(readOnly = false) Function1<RequestContext, Future<RouteResult>> route) {
route = new RouteWrapper(endpoint, route);
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;
import static java.util.Collections.singletonList;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class TapirPekkoHttpServerRouteInstrumentationModule extends InstrumentationModule
implements ExperimentalInstrumentationModule {
public TapirPekkoHttpServerRouteInstrumentationModule() {
super(
"pekko-http",
"pekko-http-1.0",
"pekko-http-server",
"pekko-http-server-route",
"tapir-pekko-http-server",
"tapir-pekko-http-server-route");
}
@Override
public String getModuleGroup() {
return "pekko-server";
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new TapirPathInstrumentation());
}
}

View File

@ -0,0 +1,133 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
import io.opentelemetry.testing.internal.armeria.client.WebClient
import io.opentelemetry.testing.internal.armeria.common.{
AggregatedHttpRequest,
HttpMethod
}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Directives.{
IntNumber,
complete,
concat,
path,
pathEndOrSingleSlash,
pathPrefix,
pathSingleSlash
}
import org.apache.pekko.http.scaladsl.server.Route
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter
import java.net.{URI, URISyntaxException}
import java.util.function.Consumer
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TapirHttpServerRouteTest {
@RegisterExtension private val testing: AgentInstrumentationExtension =
AgentInstrumentationExtension.create
private val client: WebClient = WebClient.of()
implicit val system: ActorSystem = ActorSystem("my-system")
private def buildAddress(port: Int): URI = try
new URI("http://localhost:" + port + "/")
catch {
case exception: URISyntaxException =>
throw new IllegalStateException(exception)
}
@Test def testSimple(): Unit = {
val route = path("test") {
complete("ok")
}
test(route, "/test", "GET /test")
}
@Test def testRoute(): Unit = {
val route = concat(
pathEndOrSingleSlash {
complete("root")
},
pathPrefix("test") {
concat(
pathSingleSlash {
complete("test")
},
path(IntNumber) { _ =>
complete("ok")
}
)
}
)
test(route, "/test/1", "GET /test/*")
}
@Test def testTapirRoutes(): Unit = {
val interpreter = PekkoHttpServerInterpreter()(system.dispatcher)
def makeRoute(input: EndpointInput[Unit]) = {
interpreter.toRoute(
endpoint.get
.in(input)
.errorOut(stringBody)
.out(stringBody)
.serverLogicPure[Future](_ => Right("ok"))
)
}
val routes = concat(
concat(makeRoute("test" / "1"), makeRoute("test" / "2")),
concat(makeRoute("test" / "3"), makeRoute("test" / "4"))
)
test(routes, "/test/4", "GET /test/4")
}
def test(route: Route, path: String, spanName: String): Unit = {
val port = PortUtils.findOpenPort
val address: URI = buildAddress(port)
val binding =
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
try {
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address.resolve(path).toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(200)
assertThat(response.contentUtf8).isEqualTo("ok")
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
override def accept(span: SpanDataAssert): Unit = {
span.hasName(spanName)
}
})
})
} finally {
binding.unbind()
}
}
@AfterAll
def cleanUp(): Unit = {
system.terminate()
}
}

View File

@ -31,10 +31,8 @@ import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
import java.net.{URI, URISyntaxException} import java.net.{URI, URISyntaxException}
import java.util.function.Consumer import java.util.function.Consumer
import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.Await
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @TestInstance(TestInstance.Lifecycle.PER_CLASS)
class PekkoHttpServerRouteTest { class PekkoHttpServerRouteTest {
@ -79,26 +77,6 @@ class PekkoHttpServerRouteTest {
test(route, "/test/1", "GET /test/*") test(route, "/test/1", "GET /test/*")
} }
@Test def testTapirRoutes(): Unit = {
val interpreter = PekkoHttpServerInterpreter()(system.dispatcher)
def makeRoute(input: EndpointInput[Unit]) = {
interpreter.toRoute(
endpoint.get
.in(input)
.errorOut(stringBody)
.out(stringBody)
.serverLogicPure[Future](_ => Right("ok"))
)
}
val routes = concat(
concat(makeRoute("test" / "1"), makeRoute("test" / "2")),
concat(makeRoute("test" / "3"), makeRoute("test" / "4"))
)
test(routes, "/test/4", "GET")
}
def test(route: Route, path: String, spanName: String): Unit = { def test(route: Route, path: String, spanName: String): Unit = {
val port = PortUtils.findOpenPort val port = PortUtils.findOpenPort
val address: URI = buildAddress(port) val address: URI = buildAddress(port)