Fix pekko route naming (#13491)

This commit is contained in:
Sam Wright 2025-03-27 00:14:32 +01:00 committed by GitHub
parent 93013f90eb
commit c096540523
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 292 additions and 180 deletions

View File

@ -9,6 +9,8 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentCo
import static io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.PekkoHttpServerSingletons.instrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource;
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
import java.util.ArrayDeque;
@ -143,6 +145,14 @@ public class PekkoHttpServerTracer
if (!headers.isEmpty()) {
response = (HttpResponse) response.addHeaders(headers);
}
PekkoRouteHolder routeHolder = PekkoRouteHolder.get(tracingRequest.context);
if (routeHolder != null) {
routeHolder.pushIfNotCompletelyMatched("*");
HttpServerRoute.update(
tracingRequest.context,
HttpServerRouteSource.CONTROLLER,
routeHolder.route());
}
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
}

View File

@ -1,41 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class PathConcatenationInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pekko.http.scaladsl.server.PathMatcher");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
// https://github.com/apache/incubator-pekko-http/blob/bea7d2b5c21e23d55556409226d136c282da27a3/http/src/main/scala/org/apache/pekko/http/scaladsl/server/PathMatcher.scala#L53
// https://github.com/apache/incubator-pekko-http/blob/bea7d2b5c21e23d55556409226d136c282da27a3/http/src/main/scala/org/apache/pekko/http/scaladsl/server/PathMatcher.scala#L57
// when routing dsl uses path("path1" / "path2") we are concatenating 3 segments "path1" and /
// and "path2" we need to notify the matcher that a new segment has started, so it could be
// captured in the route
PekkoRouteHolder.startSegment();
}
}
}

View File

@ -9,7 +9,9 @@ import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
@ -18,7 +20,6 @@ import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.server.PathMatcher;
import org.apache.pekko.http.scaladsl.server.PathMatchers;
import org.apache.pekko.http.scaladsl.server.PathMatchers$;
public class PathMatcherStaticInstrumentation implements TypeInstrumentation {
@Override
@ -43,11 +44,13 @@ public class PathMatcherStaticInstrumentation implements TypeInstrumentation {
@Advice.Argument(0) Uri.Path path,
@Advice.Return PathMatcher.Matching<?> result) {
// result is either matched or unmatched, we only care about the matches
Context context = Java8BytecodeBridge.currentContext();
PekkoRouteHolder routeHolder = PekkoRouteHolder.get(context);
if (routeHolder == null) {
return;
}
if (result.getClass() == PathMatcher.Matched.class) {
if (PathMatchers$.PathEnd$.class == pathMatcher.getClass()) {
PekkoRouteHolder.endMatched();
return;
}
PathMatcher.Matched<?> match = (PathMatcher.Matched<?>) result;
// if present use the matched path that was remembered in PathMatcherInstrumentation,
// otherwise just use a *
String prefix = VirtualField.find(PathMatcher.class, String.class).get(pathMatcher);
@ -58,9 +61,9 @@ public class PathMatcherStaticInstrumentation implements TypeInstrumentation {
prefix = "*";
}
}
if (prefix != null) {
PekkoRouteHolder.push(prefix);
}
routeHolder.push(path, match.pathRest(), prefix);
} else {
routeHolder.didNotMatch();
}
}
}

View File

@ -34,7 +34,6 @@ public class PekkoHttpServerRouteInstrumentationModule extends InstrumentationMo
return asList(
new PathMatcherInstrumentation(),
new PathMatcherStaticInstrumentation(),
new RouteConcatenationInstrumentation(),
new PathConcatenationInstrumentation());
new RouteConcatenationInstrumentation());
}
}

View File

@ -10,18 +10,17 @@ import static io.opentelemetry.context.ContextKey.named;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute;
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource;
import java.util.ArrayDeque;
import java.util.Deque;
import org.apache.pekko.http.scaladsl.model.Uri;
public class PekkoRouteHolder implements ImplicitContextKeyed {
private static final ContextKey<PekkoRouteHolder> KEY = named("opentelemetry-pekko-route");
private String route = "";
private boolean newSegment = true;
private boolean endMatched;
private final Deque<String> stack = new ArrayDeque<>();
private StringBuilder route = new StringBuilder();
private Uri.Path lastUnmatchedPath = null;
private boolean lastWasMatched = false;
private final Deque<State> savedStates = new ArrayDeque<>();
public static Context init(Context context) {
if (context.get(KEY) != null) {
@ -30,51 +29,51 @@ public class PekkoRouteHolder implements ImplicitContextKeyed {
return context.with(new PekkoRouteHolder());
}
public static void push(String path) {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null && holder.newSegment && !holder.endMatched) {
holder.route += path;
holder.newSegment = false;
public static PekkoRouteHolder get(Context context) {
return context.get(KEY);
}
public void push(Uri.Path beforeMatch, Uri.Path afterMatch, String pathToPush) {
// Only accept the suggested 'pathToPush' if:
// - either this is the first match, or
// - the unmatched part of the path from the previous match is what the current match
// acted upon. This avoids pushes from PathMatchers that compose other PathMatchers,
// instead only accepting pushes from leaf-nodes in the PathMatcher hierarchy that actually
// act on the path.
// AND:
// - some part of the path has now been matched by this matcher
if ((lastUnmatchedPath == null || lastUnmatchedPath.equals(beforeMatch))
&& !afterMatch.equals(beforeMatch)) {
route.append(pathToPush);
lastUnmatchedPath = afterMatch;
}
lastWasMatched = true;
}
public void didNotMatch() {
lastWasMatched = false;
}
public void pushIfNotCompletelyMatched(String pathToPush) {
if (lastUnmatchedPath != null && !lastUnmatchedPath.isEmpty()) {
route.append(pathToPush);
}
}
public static void startSegment() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.newSegment = true;
}
public String route() {
return lastWasMatched ? route.toString() : null;
}
public static void endMatched() {
Context context = Context.current();
PekkoRouteHolder holder = context.get(KEY);
if (holder != null) {
holder.endMatched = true;
HttpServerRoute.update(context, HttpServerRouteSource.CONTROLLER, holder.route);
}
public void save() {
savedStates.add(new State(lastUnmatchedPath, route));
route = new StringBuilder(route);
}
public static void save() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.stack.push(holder.route);
holder.newSegment = true;
}
}
public static void reset() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.route = holder.stack.peek();
holder.newSegment = true;
}
}
public static void restore() {
PekkoRouteHolder holder = Context.current().get(KEY);
if (holder != null) {
holder.route = holder.stack.pop();
holder.newSegment = true;
public void restore() {
State popped = savedStates.pollLast();
if (popped != null) {
lastUnmatchedPath = popped.lastUnmatchedPath;
route = popped.route;
}
}
@ -84,4 +83,14 @@ public class PekkoRouteHolder implements ImplicitContextKeyed {
}
private PekkoRouteHolder() {}
private static class State {
private final Uri.Path lastUnmatchedPath;
private final StringBuilder route;
private State(Uri.Path lastUnmatchedPath, StringBuilder route) {
this.lastUnmatchedPath = lastUnmatchedPath;
this.route = route;
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.concurrent.Future;
public class PekkoRouteWrapper implements Function1<RequestContext, Future<RouteResult>> {
private final Function1<RequestContext, Future<RouteResult>> route;
public PekkoRouteWrapper(Function1<RequestContext, Future<RouteResult>> route) {
this.route = route;
}
@Override
public Future<RouteResult> apply(RequestContext ctx) {
Context context = Java8BytecodeBridge.currentContext();
PekkoRouteHolder routeHolder = PekkoRouteHolder.get(context);
if (routeHolder == null) {
return route.apply(ctx);
} else {
routeHolder.save();
return route
.apply(ctx)
.map(
result -> {
if (result.getClass() == RouteResult.Rejected.class) {
routeHolder.restore();
}
return result;
},
ctx.executionContext());
}
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.PartialFunction;
import scala.Unit;
import scala.util.Try;
public class RestoreOnExit implements PartialFunction<Try<RouteResult>, Unit> {
@Override
public boolean isDefinedAt(Try<RouteResult> x) {
return true;
}
@Override
public Unit apply(Try<RouteResult> v1) {
PekkoRouteHolder.restore();
return null;
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
@ -14,6 +15,7 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
import scala.concurrent.Future;
public class RouteConcatenationInstrumentation implements TypeInstrumentation {
@ -24,42 +26,18 @@ public class RouteConcatenationInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
transformer.applyAdviceToMethod(
named("$anonfun$$tilde$2"), this.getClass().getName() + "$Apply2Advice");
transformer.applyAdviceToMethod(isConstructor(), this.getClass().getName() + "$ApplyAdvice");
transformer.applyAdviceToMethod(named("$tilde"), this.getClass().getName() + "$ApplyAdvice");
}
@SuppressWarnings("unused")
public static class ApplyAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
// when routing dsl uses concat(path(...) {...}, path(...) {...}) we'll restore the currently
// matched route after each matcher so that match attempts that failed wouldn't get recorded
// in the route
PekkoRouteHolder.save();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.Argument(value = 2) RequestContext ctx,
@Advice.Return(readOnly = false) Future<RouteResult> future,
@Advice.Thrown Throwable throwable) {
if (throwable != null) {
PekkoRouteHolder.restore();
} else {
future = future.andThen(new RestoreOnExit(), ctx.executionContext());
}
}
}
@SuppressWarnings("unused")
public static class Apply2Advice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter() {
PekkoRouteHolder.reset();
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
Function1<RequestContext, Future<RouteResult>> route) {
route = new PekkoRouteWrapper(route);
}
}
}

View File

@ -5,7 +5,11 @@
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.tapir;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
import java.nio.charset.Charset;
import org.apache.pekko.http.scaladsl.model.Uri;
import org.apache.pekko.http.scaladsl.server.RequestContext;
import org.apache.pekko.http.scaladsl.server.RouteResult;
import scala.Function1;
@ -19,6 +23,7 @@ import sttp.tapir.EndpointInput;
import sttp.tapir.server.ServerEndpoint;
public class RouteWrapper implements Function1<RequestContext, Future<RouteResult>> {
private static final Uri.Path EMPTY = Uri.Path$.MODULE$.apply("", Charset.defaultCharset());
private final Function1<RequestContext, Future<RouteResult>> route;
private final ServerEndpoint<?, ?> serverEndpoint;
@ -28,7 +33,13 @@ public class RouteWrapper implements Function1<RequestContext, Future<RouteResul
this.serverEndpoint = serverEndpoint;
}
public class Finalizer implements PartialFunction<Try<RouteResult>, Unit> {
class Finalizer implements PartialFunction<Try<RouteResult>, Unit> {
private final Uri.Path beforeMatch;
public Finalizer(Uri.Path beforeMatch) {
this.beforeMatch = beforeMatch;
}
@Override
public boolean isDefinedAt(Try<RouteResult> tryResult) {
return tryResult.isSuccess();
@ -36,7 +47,9 @@ public class RouteWrapper implements Function1<RequestContext, Future<RouteResul
@Override
public Unit apply(Try<RouteResult> tryResult) {
if (tryResult.isSuccess()) {
Context context = Java8BytecodeBridge.currentContext();
PekkoRouteHolder routeHolder = PekkoRouteHolder.get(context);
if (routeHolder != null && tryResult.isSuccess()) {
RouteResult result = tryResult.get();
if (result.getClass() == RouteResult.Complete.class) {
String path =
@ -50,9 +63,7 @@ public class RouteWrapper implements Function1<RequestContext, Future<RouteResul
"*",
Option.apply("*"),
Option.apply("*"));
PekkoRouteHolder.push(path);
PekkoRouteHolder.endMatched();
routeHolder.push(beforeMatch, EMPTY, path);
}
}
return null;
@ -61,6 +72,6 @@ public class RouteWrapper implements Function1<RequestContext, Future<RouteResul
@Override
public Future<RouteResult> apply(RequestContext ctx) {
return route.apply(ctx).andThen(new Finalizer(), ctx.executionContext());
return route.apply(ctx).andThen(new Finalizer(ctx.unmatchedPath()), ctx.executionContext());
}
}

View File

@ -15,15 +15,7 @@ import io.opentelemetry.testing.internal.armeria.common.{
}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Directives.{
IntNumber,
complete,
concat,
path,
pathEndOrSingleSlash,
pathPrefix,
pathSingleSlash
}
import org.apache.pekko.http.scaladsl.server.Directives.{concat, pathPrefix}
import org.apache.pekko.http.scaladsl.server.Route
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.extension.RegisterExtension
@ -52,6 +44,7 @@ class TapirHttpServerRouteTest {
}
@Test def testSimple(): Unit = {
import org.apache.pekko.http.scaladsl.server.Directives._
val route = path("test") {
complete("ok")
}
@ -60,6 +53,7 @@ class TapirHttpServerRouteTest {
}
@Test def testRoute(): Unit = {
import org.apache.pekko.http.scaladsl.server.Directives._
val route = concat(
pathEndOrSingleSlash {
complete("root")
@ -99,6 +93,21 @@ class TapirHttpServerRouteTest {
test(routes, "/test/4", "GET /test/4")
}
@Test def testTapirWithPathPrefix(): Unit = {
val interpreter = PekkoHttpServerInterpreter()(system.dispatcher)
val tapirRoute = interpreter.toRoute(
endpoint.get
.in(path[Int]("i") / "bar")
.errorOut(stringBody)
.out(stringBody)
.serverLogicPure[Future](_ => Right("ok"))
)
val prefixedRoute = pathPrefix("foo") { tapirRoute }
test(prefixedRoute, "/foo/123/bar", "GET /foo/{i}/bar")
}
def test(route: Route, path: String, spanName: String): Unit = {
val port = PortUtils.findOpenPort
val address: URI = buildAddress(port)

View File

@ -15,16 +15,8 @@ import io.opentelemetry.testing.internal.armeria.common.{
}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.server.Directives.{
IntNumber,
complete,
concat,
path,
pathEndOrSingleSlash,
pathPrefix,
pathSingleSlash
}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.extension.RegisterExtension
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
@ -57,27 +49,151 @@ class PekkoHttpServerRouteTest {
test(route, "/test", "GET /test")
}
@Test def testRoute(): Unit = {
@Test def testPathPrefix(): Unit = {
import org.apache.pekko.http.scaladsl.server.Directives._
val route =
pathPrefix("a") {
pathPrefix("b") {
path("c") {
complete("ok")
}
}
}
test(route, "/a/b/c", "GET /a/b/c")
}
@Test def testTrailingSlash(): Unit = {
val route = path("foo"./) { complete("ok") }
test(route, "/foo/", "GET /foo/")
}
@Test def testSlash(): Unit = {
val route = path("foo" / "bar") { complete("ok") }
test(route, "/foo/bar", "GET /foo/bar")
}
@Test def testEncodedSlash(): Unit = {
val route = path("foo/bar") { complete("ok") }
test(route, "/foo%2Fbar", "GET /foo%2Fbar")
}
@Test def testSeparateOnSlashes(): Unit = {
val route = path(separateOnSlashes("foo/bar")) { complete("ok") }
test(route, "/foo/bar", "GET /foo/bar")
}
@Test def testMatchRegex(): Unit = {
val route = path("foo" / """number-\d+""".r) { _ => complete("ok") }
test(route, "/foo/number-123", "GET /foo/*")
}
@Test def testPipe(): Unit = {
val route = path("i" ~ IntNumber | "h" ~ HexIntNumber) { _ =>
complete("ok")
}
test(route, "/i42", "GET /i*")
test(route, "/hCAFE", "GET /h*")
}
@Test def testMapExtractor(): Unit = {
val route = path("colours" / Map("red" -> 1, "green" -> 2, "blue" -> 3)) {
_ => complete("ok")
}
test(route, "/colours/red", "GET /colours/red")
test(route, "/colours/green", "GET /colours/green")
}
@Test def testNotMatch(): Unit = {
val route = pathPrefix("foo" ~ not("bar")) { complete("ok") }
test(route, "/fooish", "GET /foo*")
test(route, "/fooish/123", "GET /foo*")
}
@Test def testProvide(): Unit = {
val route = pathPrefix("foo") {
provide("hi") { _ =>
path("bar") {
complete("ok")
}
}
}
test(route, "/foo/bar", "GET /foo/bar")
}
@Test def testOptional(): Unit = {
val route = path("foo" / "bar" / "X" ~ IntNumber.? / ("edit" | "create")) {
_ => complete("ok")
}
test(route, "/foo/bar/X42/edit", "GET /foo/bar/X*/edit")
test(route, "/foo/bar/X/edit", "GET /foo/bar/X/edit")
}
@Test def testNoMatches(): Unit = {
val route = path("foo" / "bar") { complete("ok") }
test(
route,
"/foo/wrong",
"GET",
404,
"The requested resource could not be found."
)
}
@Test def testError(): Unit = {
val route = path("foo" / IntNumber) { _ =>
failWith(new RuntimeException("oops"))
}
test(
route,
"/foo/123",
"GET /foo/*",
500,
"There was an internal server error."
)
}
@Test def testConcat(): Unit = {
val route = concat(
pathEndOrSingleSlash {
complete("root")
},
path(".+".r / "wrong1") { _ =>
complete("wrong1")
},
pathPrefix("test") {
concat(
pathSingleSlash {
complete("test")
},
path(IntNumber) { _ =>
complete("ok")
pathPrefix("foo") {
concat(
path(IntNumber) { _ =>
complete("ok")
}
)
},
path("something-else") {
complete("test")
}
)
},
path("test" / "wrong2") {
complete("wrong2")
}
)
test(route, "/test/1", "GET /test/*")
test(route, "/test/foo/1", "GET /test/foo/*")
}
def test(route: Route, path: String, spanName: String): Unit = {
def test(
route: Route,
path: String,
spanName: String,
expectedStatus: Int = 200,
expectedMsg: String = "ok"
): Unit = {
testing.clearData()
val port = PortUtils.findOpenPort
val address: URI = buildAddress(port)
val binding =
@ -88,8 +204,8 @@ class PekkoHttpServerRouteTest {
address.resolve(path).toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(200)
assertThat(response.contentUtf8).isEqualTo("ok")
assertThat(response.status.code).isEqualTo(expectedStatus)
assertThat(response.contentUtf8).isEqualTo(expectedMsg)
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
override def accept(trace: TraceAssert): Unit =