Improve akka route handling with java dsl (#11926)
This commit is contained in:
parent
d15927a8bc
commit
42d7177222
|
@ -42,6 +42,22 @@ dependencies {
|
||||||
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
|
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
testing {
|
||||||
|
suites {
|
||||||
|
val javaRouteTest by registering(JvmTestSuite::class) {
|
||||||
|
dependencies {
|
||||||
|
if (findProperty("testLatestDeps") as Boolean) {
|
||||||
|
implementation("com.typesafe.akka:akka-http_2.13:+")
|
||||||
|
implementation("com.typesafe.akka:akka-stream_2.13:+")
|
||||||
|
} else {
|
||||||
|
implementation("com.typesafe.akka:akka-http_2.12:10.2.0")
|
||||||
|
implementation("com.typesafe.akka:akka-stream_2.12:2.6.21")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tasks {
|
tasks {
|
||||||
withType<Test>().configureEach {
|
withType<Test>().configureEach {
|
||||||
// required on jdk17
|
// required on jdk17
|
||||||
|
@ -52,6 +68,10 @@ tasks {
|
||||||
|
|
||||||
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
|
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
check {
|
||||||
|
dependsOn(testing.suites)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (findProperty("testLatestDeps") as Boolean) {
|
if (findProperty("testLatestDeps") as Boolean) {
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.akkahttp;
|
||||||
|
|
||||||
|
import static akka.http.javadsl.server.PathMatchers.integerSegment;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.http.javadsl.Http;
|
||||||
|
import akka.http.javadsl.ServerBinding;
|
||||||
|
import akka.http.javadsl.server.AllDirectives;
|
||||||
|
import akka.http.javadsl.server.Route;
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.client.WebClient;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class AkkaHttpServerJavaRouteTest extends AllDirectives {
|
||||||
|
@RegisterExtension
|
||||||
|
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
private final WebClient client = WebClient.of();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRoute() {
|
||||||
|
ActorSystem system = ActorSystem.create("my-system");
|
||||||
|
int port = PortUtils.findOpenPort();
|
||||||
|
Http http = Http.get(system);
|
||||||
|
|
||||||
|
Route route =
|
||||||
|
concat(
|
||||||
|
pathEndOrSingleSlash(() -> complete("root")),
|
||||||
|
pathPrefix(
|
||||||
|
"test",
|
||||||
|
() ->
|
||||||
|
concat(
|
||||||
|
pathSingleSlash(() -> complete("test")),
|
||||||
|
path(integerSegment(), (i) -> complete("ok")))));
|
||||||
|
|
||||||
|
CompletionStage<ServerBinding> binding = http.newServerAt("localhost", port).bind(route);
|
||||||
|
try {
|
||||||
|
AggregatedHttpRequest request =
|
||||||
|
AggregatedHttpRequest.of(HttpMethod.GET, "h1c://localhost:" + port + "/test/1");
|
||||||
|
AggregatedHttpResponse response = client.execute(request).aggregate().join();
|
||||||
|
|
||||||
|
assertThat(response.status().code()).isEqualTo(200);
|
||||||
|
assertThat(response.contentUtf8()).isEqualTo("ok");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("GET /test/*")));
|
||||||
|
} finally {
|
||||||
|
binding.thenCompose(ServerBinding::unbind).thenAccept(unbound -> system.terminate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,7 @@ public class AkkaRouteHolder implements ImplicitContextKeyed {
|
||||||
private static final ContextKey<AkkaRouteHolder> KEY = named("opentelemetry-akka-route");
|
private static final ContextKey<AkkaRouteHolder> KEY = named("opentelemetry-akka-route");
|
||||||
|
|
||||||
private String route = "";
|
private String route = "";
|
||||||
private boolean newSegment;
|
private boolean newSegment = true;
|
||||||
private boolean endMatched;
|
private boolean endMatched;
|
||||||
private final Deque<String> stack = new ArrayDeque<>();
|
private final Deque<String> stack = new ArrayDeque<>();
|
||||||
|
|
||||||
|
@ -70,6 +70,15 @@ public class AkkaRouteHolder implements ImplicitContextKeyed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset the state to when save was called
|
||||||
|
public static void reset() {
|
||||||
|
AkkaRouteHolder holder = Context.current().get(KEY);
|
||||||
|
if (holder != null) {
|
||||||
|
holder.route = holder.stack.peek();
|
||||||
|
holder.newSegment = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Context storeInContext(Context context) {
|
public Context storeInContext(Context context) {
|
||||||
return context.with(KEY, this);
|
return context.with(KEY, this);
|
||||||
|
|
|
@ -17,14 +17,26 @@ public class RouteConcatenationInstrumentation implements TypeInstrumentation {
|
||||||
@Override
|
@Override
|
||||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||||
return namedOneOf(
|
return namedOneOf(
|
||||||
|
// scala 2.11
|
||||||
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
|
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
|
||||||
|
// scala 2.12 and later
|
||||||
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
|
"akka.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transform(TypeTransformer transformer) {
|
public void transform(TypeTransformer transformer) {
|
||||||
transformer.applyAdviceToMethod(
|
transformer.applyAdviceToMethod(
|
||||||
namedOneOf("apply", "$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
|
namedOneOf(
|
||||||
|
// scala 2.11
|
||||||
|
"apply",
|
||||||
|
// scala 2.12 and later
|
||||||
|
"$anonfun$$tilde$1"),
|
||||||
|
this.getClass().getName() + "$ApplyAdvice");
|
||||||
|
|
||||||
|
// This advice seems to be only needed when defining routes with java dsl. Since java dsl tests
|
||||||
|
// use scala 2.12 we are going to skip instrumenting this for scala 2.11.
|
||||||
|
transformer.applyAdviceToMethod(
|
||||||
|
namedOneOf("$anonfun$$tilde$2"), this.getClass().getName() + "$Apply2Advice");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
@ -43,4 +55,13 @@ public class RouteConcatenationInstrumentation implements TypeInstrumentation {
|
||||||
AkkaRouteHolder.restore();
|
AkkaRouteHolder.restore();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public static class Apply2Advice {
|
||||||
|
|
||||||
|
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||||
|
public static void onEnter() {
|
||||||
|
AkkaRouteHolder.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,113 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.akkahttp
|
||||||
|
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.http.scaladsl.Http
|
||||||
|
import akka.http.scaladsl.server.Directives.{
|
||||||
|
IntNumber,
|
||||||
|
complete,
|
||||||
|
concat,
|
||||||
|
path,
|
||||||
|
pathEndOrSingleSlash,
|
||||||
|
pathPrefix,
|
||||||
|
pathSingleSlash
|
||||||
|
}
|
||||||
|
import akka.http.scaladsl.server.Route
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
|
||||||
|
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
|
||||||
|
import io.opentelemetry.testing.internal.armeria.client.WebClient
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.{
|
||||||
|
AggregatedHttpRequest,
|
||||||
|
HttpMethod
|
||||||
|
}
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
|
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension
|
||||||
|
|
||||||
|
import java.net.{URI, URISyntaxException}
|
||||||
|
import java.util.function.Consumer
|
||||||
|
import scala.concurrent.duration.DurationInt
|
||||||
|
import scala.concurrent.Await
|
||||||
|
|
||||||
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
|
class AkkaHttpServerRouteTest {
|
||||||
|
@RegisterExtension private val testing: AgentInstrumentationExtension =
|
||||||
|
AgentInstrumentationExtension.create
|
||||||
|
private val client: WebClient = WebClient.of()
|
||||||
|
|
||||||
|
implicit val system: ActorSystem = ActorSystem("my-system")
|
||||||
|
implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||||
|
|
||||||
|
private def buildAddress(port: Int): URI = try
|
||||||
|
new URI("http://localhost:" + port + "/")
|
||||||
|
catch {
|
||||||
|
case exception: URISyntaxException =>
|
||||||
|
throw new IllegalStateException(exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test def testSimple(): Unit = {
|
||||||
|
val route = path("test") {
|
||||||
|
complete("ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
test(route, "/test", "GET /test")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test def testRoute(): Unit = {
|
||||||
|
val route = concat(
|
||||||
|
pathEndOrSingleSlash {
|
||||||
|
complete("root")
|
||||||
|
},
|
||||||
|
pathPrefix("test") {
|
||||||
|
concat(
|
||||||
|
pathSingleSlash {
|
||||||
|
complete("test")
|
||||||
|
},
|
||||||
|
path(IntNumber) { _ =>
|
||||||
|
complete("ok")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
test(route, "/test/1", "GET /test/*")
|
||||||
|
}
|
||||||
|
|
||||||
|
def test(route: Route, path: String, spanName: String): Unit = {
|
||||||
|
val port = PortUtils.findOpenPort
|
||||||
|
val address: URI = buildAddress(port)
|
||||||
|
val binding =
|
||||||
|
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
|
||||||
|
try {
|
||||||
|
val request = AggregatedHttpRequest.of(
|
||||||
|
HttpMethod.GET,
|
||||||
|
address.resolve(path).toString
|
||||||
|
)
|
||||||
|
val response = client.execute(request).aggregate.join
|
||||||
|
assertThat(response.status.code).isEqualTo(200)
|
||||||
|
assertThat(response.contentUtf8).isEqualTo("ok")
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
|
||||||
|
override def accept(trace: TraceAssert): Unit =
|
||||||
|
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
|
||||||
|
override def accept(span: SpanDataAssert): Unit = {
|
||||||
|
span.hasName(spanName)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
binding.unbind()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
def cleanUp(): Unit = {
|
||||||
|
system.terminate()
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
|
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
|
||||||
|
|
||||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||||
|
|
||||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||||
|
@ -16,15 +16,13 @@ import net.bytebuddy.matcher.ElementMatcher;
|
||||||
public class PathConcatenationInstrumentation implements TypeInstrumentation {
|
public class PathConcatenationInstrumentation implements TypeInstrumentation {
|
||||||
@Override
|
@Override
|
||||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||||
return namedOneOf(
|
return named("org.apache.pekko.http.scaladsl.server.PathMatcher");
|
||||||
"org.apache.pekko.http.scaladsl.server.PathMatcher$$anonfun$$tilde$1",
|
|
||||||
"org.apache.pekko.http.scaladsl.server.PathMatcher");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transform(TypeTransformer transformer) {
|
public void transform(TypeTransformer transformer) {
|
||||||
transformer.applyAdviceToMethod(
|
transformer.applyAdviceToMethod(
|
||||||
namedOneOf("apply", "$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
|
named("$anonfun$append$1"), this.getClass().getName() + "$ApplyAdvice");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
|
|
@ -19,7 +19,7 @@ public class PekkoRouteHolder implements ImplicitContextKeyed {
|
||||||
private static final ContextKey<PekkoRouteHolder> KEY = named("opentelemetry-pekko-route");
|
private static final ContextKey<PekkoRouteHolder> KEY = named("opentelemetry-pekko-route");
|
||||||
|
|
||||||
private String route = "";
|
private String route = "";
|
||||||
private boolean newSegment;
|
private boolean newSegment = true;
|
||||||
private boolean endMatched;
|
private boolean endMatched;
|
||||||
private final Deque<String> stack = new ArrayDeque<>();
|
private final Deque<String> stack = new ArrayDeque<>();
|
||||||
|
|
||||||
|
@ -62,6 +62,14 @@ public class PekkoRouteHolder implements ImplicitContextKeyed {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void reset() {
|
||||||
|
PekkoRouteHolder holder = Context.current().get(KEY);
|
||||||
|
if (holder != null) {
|
||||||
|
holder.route = holder.stack.peek();
|
||||||
|
holder.newSegment = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void restore() {
|
public static void restore() {
|
||||||
PekkoRouteHolder holder = Context.current().get(KEY);
|
PekkoRouteHolder holder = Context.current().get(KEY);
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
|
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route;
|
||||||
|
|
||||||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
|
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||||
|
|
||||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||||
|
@ -16,15 +16,15 @@ import net.bytebuddy.matcher.ElementMatcher;
|
||||||
public class RouteConcatenationInstrumentation implements TypeInstrumentation {
|
public class RouteConcatenationInstrumentation implements TypeInstrumentation {
|
||||||
@Override
|
@Override
|
||||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||||
return namedOneOf(
|
return named("org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
|
||||||
"org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation$$anonfun$$tilde$1",
|
|
||||||
"org.apache.pekko.http.scaladsl.server.RouteConcatenation$RouteWithConcatenation");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transform(TypeTransformer transformer) {
|
public void transform(TypeTransformer transformer) {
|
||||||
transformer.applyAdviceToMethod(
|
transformer.applyAdviceToMethod(
|
||||||
namedOneOf("apply", "$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
|
named("$anonfun$$tilde$1"), this.getClass().getName() + "$ApplyAdvice");
|
||||||
|
transformer.applyAdviceToMethod(
|
||||||
|
named("$anonfun$$tilde$2"), this.getClass().getName() + "$Apply2Advice");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
|
@ -43,4 +43,13 @@ public class RouteConcatenationInstrumentation implements TypeInstrumentation {
|
||||||
PekkoRouteHolder.restore();
|
PekkoRouteHolder.restore();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public static class Apply2Advice {
|
||||||
|
|
||||||
|
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||||
|
public static void onEnter() {
|
||||||
|
PekkoRouteHolder.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0;
|
||||||
|
|
||||||
|
import static org.apache.pekko.http.javadsl.server.PathMatchers.integerSegment;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.client.WebClient;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse;
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.HttpMethod;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
import org.apache.pekko.actor.ActorSystem;
|
||||||
|
import org.apache.pekko.http.javadsl.Http;
|
||||||
|
import org.apache.pekko.http.javadsl.ServerBinding;
|
||||||
|
import org.apache.pekko.http.javadsl.server.AllDirectives;
|
||||||
|
import org.apache.pekko.http.javadsl.server.Route;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
class PekkoHttpServerJavaRouteTest extends AllDirectives {
|
||||||
|
@RegisterExtension
|
||||||
|
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||||
|
|
||||||
|
private final WebClient client = WebClient.of();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRoute() {
|
||||||
|
ActorSystem system = ActorSystem.create("my-system");
|
||||||
|
int port = PortUtils.findOpenPort();
|
||||||
|
Http http = Http.get(system);
|
||||||
|
|
||||||
|
Route route =
|
||||||
|
concat(
|
||||||
|
pathEndOrSingleSlash(() -> complete("root")),
|
||||||
|
pathPrefix(
|
||||||
|
"test",
|
||||||
|
() ->
|
||||||
|
concat(
|
||||||
|
pathSingleSlash(() -> complete("test")),
|
||||||
|
path(integerSegment(), (i) -> complete("ok")))));
|
||||||
|
|
||||||
|
CompletionStage<ServerBinding> binding = http.newServerAt("localhost", port).bind(route);
|
||||||
|
try {
|
||||||
|
AggregatedHttpRequest request =
|
||||||
|
AggregatedHttpRequest.of(HttpMethod.GET, "h1c://localhost:" + port + "/test/1");
|
||||||
|
AggregatedHttpResponse response = client.execute(request).aggregate().join();
|
||||||
|
|
||||||
|
assertThat(response.status().code()).isEqualTo(200);
|
||||||
|
assertThat(response.contentUtf8()).isEqualTo("ok");
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(
|
||||||
|
trace -> trace.hasSpansSatisfyingExactly(span -> span.hasName("GET /test/*")));
|
||||||
|
} finally {
|
||||||
|
binding.thenCompose(ServerBinding::unbind).thenAccept(unbound -> system.terminate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,113 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||||
|
|
||||||
|
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||||
|
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension
|
||||||
|
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
|
||||||
|
import io.opentelemetry.testing.internal.armeria.client.WebClient
|
||||||
|
import io.opentelemetry.testing.internal.armeria.common.{
|
||||||
|
AggregatedHttpRequest,
|
||||||
|
HttpMethod
|
||||||
|
}
|
||||||
|
import org.apache.pekko.actor.ActorSystem
|
||||||
|
import org.apache.pekko.http.scaladsl.Http
|
||||||
|
import org.apache.pekko.http.scaladsl.server.Route
|
||||||
|
import org.apache.pekko.http.scaladsl.server.Directives.{
|
||||||
|
IntNumber,
|
||||||
|
complete,
|
||||||
|
concat,
|
||||||
|
path,
|
||||||
|
pathEndOrSingleSlash,
|
||||||
|
pathPrefix,
|
||||||
|
pathSingleSlash
|
||||||
|
}
|
||||||
|
import org.apache.pekko.stream.ActorMaterializer
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension
|
||||||
|
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}
|
||||||
|
|
||||||
|
import java.net.{URI, URISyntaxException}
|
||||||
|
import java.util.function.Consumer
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration.DurationInt
|
||||||
|
|
||||||
|
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||||
|
class PekkoHttpServerRouteTest {
|
||||||
|
@RegisterExtension private val testing: AgentInstrumentationExtension =
|
||||||
|
AgentInstrumentationExtension.create
|
||||||
|
private val client: WebClient = WebClient.of()
|
||||||
|
|
||||||
|
implicit val system: ActorSystem = ActorSystem("my-system")
|
||||||
|
implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||||
|
|
||||||
|
private def buildAddress(port: Int): URI = try
|
||||||
|
new URI("http://localhost:" + port + "/")
|
||||||
|
catch {
|
||||||
|
case exception: URISyntaxException =>
|
||||||
|
throw new IllegalStateException(exception)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test def testSimple(): Unit = {
|
||||||
|
val route = path("test") {
|
||||||
|
complete("ok")
|
||||||
|
}
|
||||||
|
|
||||||
|
test(route, "/test", "GET /test")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test def testRoute(): Unit = {
|
||||||
|
val route = concat(
|
||||||
|
pathEndOrSingleSlash {
|
||||||
|
complete("root")
|
||||||
|
},
|
||||||
|
pathPrefix("test") {
|
||||||
|
concat(
|
||||||
|
pathSingleSlash {
|
||||||
|
complete("test")
|
||||||
|
},
|
||||||
|
path(IntNumber) { _ =>
|
||||||
|
complete("ok")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
test(route, "/test/1", "GET /test/*")
|
||||||
|
}
|
||||||
|
|
||||||
|
def test(route: Route, path: String, spanName: String): Unit = {
|
||||||
|
val port = PortUtils.findOpenPort
|
||||||
|
val address: URI = buildAddress(port)
|
||||||
|
val binding =
|
||||||
|
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
|
||||||
|
try {
|
||||||
|
val request = AggregatedHttpRequest.of(
|
||||||
|
HttpMethod.GET,
|
||||||
|
address.resolve(path).toString
|
||||||
|
)
|
||||||
|
val response = client.execute(request).aggregate.join
|
||||||
|
assertThat(response.status.code).isEqualTo(200)
|
||||||
|
assertThat(response.contentUtf8).isEqualTo("ok")
|
||||||
|
|
||||||
|
testing.waitAndAssertTraces(new Consumer[TraceAssert] {
|
||||||
|
override def accept(trace: TraceAssert): Unit =
|
||||||
|
trace.hasSpansSatisfyingExactly(new Consumer[SpanDataAssert] {
|
||||||
|
override def accept(span: SpanDataAssert): Unit = {
|
||||||
|
span.hasName(spanName)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
} finally {
|
||||||
|
binding.unbind()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
def cleanUp(): Unit = {
|
||||||
|
system.terminate()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue