Instrument Pekko (by copying from akka) (#9527)
This commit is contained in:
parent
2724a87743
commit
a8cb7a1879
|
@ -84,6 +84,13 @@ final class ExecutorMatchers {
|
|||
"org.eclipse.jetty.util.thread.ReservedThreadExecutor",
|
||||
"org.glassfish.grizzly.threadpool.GrizzlyExecutorService",
|
||||
"org.jboss.threads.EnhancedQueueExecutor",
|
||||
"org.apache.pekko.dispatch.BalancingDispatcher",
|
||||
"org.apache.pekko.dispatch.Dispatcher",
|
||||
"org.apache.pekko.dispatch.Dispatcher$LazyExecutorServiceDelegate",
|
||||
"org.apache.pekko.dispatch.ExecutionContexts$sameThreadExecutionContext$",
|
||||
"org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinPool",
|
||||
"org.apache.pekko.dispatch.MessageDispatcher",
|
||||
"org.apache.pekko.dispatch.PinnedDispatcher",
|
||||
"play.api.libs.streams.Execution$trampoline$",
|
||||
"play.shaded.ahc.io.netty.util.concurrent.ThreadPerTaskExecutor",
|
||||
"scala.concurrent.forkjoin.ForkJoinPool",
|
||||
|
|
|
@ -61,6 +61,8 @@ public class FutureInstrumentation implements TypeInstrumentation {
|
|||
"java.util.concurrent.FutureTask",
|
||||
"java.util.concurrent.RecursiveAction",
|
||||
"java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask",
|
||||
"org.apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask",
|
||||
"org.apache.pekko.dispatch.Mailbox",
|
||||
"scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask",
|
||||
"scala.concurrent.forkjoin.ForkJoinTask",
|
||||
"scala.concurrent.forkjoin.ForkJoinTask$AdaptedCallable",
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
id("otel.scala-conventions")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("org.apache.pekko")
|
||||
module.set("pekko-actor_2.12")
|
||||
versions.set("[1.0,)")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
pass {
|
||||
group.set("org.apache.pekko")
|
||||
module.set("pekko-actor_2.13")
|
||||
versions.set("[1.0,)")
|
||||
assertInverse.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
bootstrap(project(":instrumentation:executors:bootstrap"))
|
||||
|
||||
library("org.apache.pekko:pekko-actor_2.12:1.0.1")
|
||||
|
||||
latestDepTestLibrary("org.apache.pekko:pekko-actor_2.13:+")
|
||||
|
||||
testImplementation(project(":instrumentation:executors:testing"))
|
||||
}
|
||||
|
||||
if (findProperty("testLatestDeps") as Boolean) {
|
||||
configurations {
|
||||
// pekko artifact name is different for regular and latest tests
|
||||
testImplementation {
|
||||
exclude("org.apache.pekko", "pekko-actor_2.12")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.util.VirtualField;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.dispatch.Envelope;
|
||||
import org.apache.pekko.dispatch.sysmsg.SystemMessage;
|
||||
|
||||
public class PekkoActorCellInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.actor.ActorCell");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("invoke").and(takesArgument(0, named("org.apache.pekko.dispatch.Envelope"))),
|
||||
PekkoActorCellInstrumentation.class.getName() + "$InvokeAdvice");
|
||||
transformer.applyAdviceToMethod(
|
||||
named("systemInvoke")
|
||||
.and(takesArgument(0, named("org.apache.pekko.dispatch.sysmsg.SystemMessage"))),
|
||||
PekkoActorCellInstrumentation.class.getName() + "$SystemInvokeAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class InvokeAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope enter(@Advice.Argument(0) Envelope envelope) {
|
||||
VirtualField<Envelope, PropagatedContext> virtualField =
|
||||
VirtualField.find(Envelope.class, PropagatedContext.class);
|
||||
return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, envelope);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class SystemInvokeAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope enter(@Advice.Argument(0) SystemMessage systemMessage) {
|
||||
VirtualField<SystemMessage, PropagatedContext> virtualField =
|
||||
VirtualField.find(SystemMessage.class, PropagatedContext.class);
|
||||
return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, systemMessage);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class PekkoActorInstrumentationModule extends InstrumentationModule {
|
||||
public PekkoActorInstrumentationModule() {
|
||||
super("pekko-actor", "pekko-actor-1.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(
|
||||
new PekkoDispatcherInstrumentation(),
|
||||
new PekkoActorCellInstrumentation(),
|
||||
new PekkoDefaultSystemMessageQueueInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.util.VirtualField;
|
||||
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.dispatch.sysmsg.SystemMessage;
|
||||
|
||||
public class PekkoDefaultSystemMessageQueueInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return implementsInterface(named("org.apache.pekko.dispatch.DefaultSystemMessageQueue"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher<ClassLoader> classLoaderOptimization() {
|
||||
return hasClassesNamed("org.apache.pekko.dispatch.DefaultSystemMessageQueue");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("systemEnqueue")
|
||||
.and(takesArgument(0, named("org.apache.pekko.actor.ActorRef")))
|
||||
.and(takesArgument(1, named("org.apache.pekko.dispatch.sysmsg.SystemMessage"))),
|
||||
PekkoDefaultSystemMessageQueueInstrumentation.class.getName() + "$DispatchSystemAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class DispatchSystemAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static PropagatedContext enter(@Advice.Argument(1) SystemMessage systemMessage) {
|
||||
Context context = Java8BytecodeBridge.currentContext();
|
||||
if (ExecutorAdviceHelper.shouldPropagateContext(context, systemMessage)) {
|
||||
VirtualField<SystemMessage, PropagatedContext> virtualField =
|
||||
VirtualField.find(SystemMessage.class, PropagatedContext.class);
|
||||
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, systemMessage);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(
|
||||
@Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
|
||||
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.util.VirtualField;
|
||||
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
|
||||
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.dispatch.Envelope;
|
||||
|
||||
public class PekkoDispatcherInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.dispatch.Dispatcher");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("dispatch")
|
||||
.and(takesArgument(0, named("org.apache.pekko.actor.ActorCell")))
|
||||
.and(takesArgument(1, named("org.apache.pekko.dispatch.Envelope"))),
|
||||
PekkoDispatcherInstrumentation.class.getName() + "$DispatchEnvelopeAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class DispatchEnvelopeAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static PropagatedContext enterDispatch(@Advice.Argument(1) Envelope envelope) {
|
||||
Context context = Java8BytecodeBridge.currentContext();
|
||||
if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope.message())) {
|
||||
VirtualField<Envelope, PropagatedContext> virtualField =
|
||||
VirtualField.find(Envelope.class, PropagatedContext.class);
|
||||
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, envelope);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exitDispatch(
|
||||
@Advice.Enter PropagatedContext propagatedContext, @Advice.Thrown Throwable throwable) {
|
||||
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder;
|
||||
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer;
|
||||
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
|
||||
|
||||
@AutoService(IgnoredTypesConfigurer.class)
|
||||
public class PekkoIgnoredTypesConfigurer implements IgnoredTypesConfigurer {
|
||||
|
||||
@Override
|
||||
public void configure(IgnoredTypesBuilder builder, ConfigProperties config) {
|
||||
// This is a Mailbox created by org.apache.pekko.dispatch.Dispatcher#createMailbox. We must not
|
||||
// add a context to it as context should only be carried by individual envelopes in the queue
|
||||
// of this mailbox.
|
||||
builder.ignoreTaskClass("org.apache.pekko.dispatch.Dispatcher$$anon$1");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0
|
||||
|
||||
import io.opentelemetry.api.common.Attributes
|
||||
import io.opentelemetry.instrumentation.testing.junit.{
|
||||
AgentInstrumentationExtension,
|
||||
InstrumentationExtension
|
||||
}
|
||||
import io.opentelemetry.sdk.testing.assertj.{SpanDataAssert, TraceAssert}
|
||||
import org.junit.jupiter.api.TestInstance
|
||||
import org.junit.jupiter.api.extension.RegisterExtension
|
||||
import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.ValueSource
|
||||
|
||||
import java.util.function.Consumer
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
class PekkoActorTest {
|
||||
|
||||
@RegisterExtension val testing: InstrumentationExtension =
|
||||
AgentInstrumentationExtension.create
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = Array(1, 150))
|
||||
def basicTell(count: Int): Unit = {
|
||||
val tester = new PekkoActors
|
||||
(1 to count).foreach { _ =>
|
||||
tester.basicTell()
|
||||
}
|
||||
|
||||
val assertions = (1 to count)
|
||||
.map(_ =>
|
||||
new Consumer[TraceAssert] {
|
||||
override def accept(trace: TraceAssert): Unit =
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("parent")
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
},
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("Howdy, Pekko")
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
.asJava
|
||||
|
||||
testing.waitAndAssertTraces(assertions)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = Array(1, 150))
|
||||
def basicAsk(count: Int): Unit = {
|
||||
val tester = new PekkoActors
|
||||
(1 to count).foreach { _ =>
|
||||
tester.basicAsk()
|
||||
}
|
||||
|
||||
val assertions = (1 to count)
|
||||
.map(_ =>
|
||||
new Consumer[TraceAssert] {
|
||||
override def accept(trace: TraceAssert): Unit =
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("parent")
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
},
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("Howdy, Pekko")
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
.asJava
|
||||
|
||||
testing.waitAndAssertTraces(assertions)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(ints = Array(1, 150))
|
||||
def basicForward(count: Int): Unit = {
|
||||
val tester = new PekkoActors
|
||||
(1 to count).foreach { _ =>
|
||||
tester.basicForward()
|
||||
}
|
||||
|
||||
val assertions = (1 to count)
|
||||
.map(_ =>
|
||||
new Consumer[TraceAssert] {
|
||||
override def accept(trace: TraceAssert): Unit =
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("parent")
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
},
|
||||
new Consumer[SpanDataAssert] {
|
||||
override def accept(span: SpanDataAssert): Unit = {
|
||||
span
|
||||
.hasName("Hello, Pekko")
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributes(Attributes.empty())
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
.asJava
|
||||
|
||||
testing.waitAndAssertTraces(assertions)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0
|
||||
|
||||
import org.apache.pekko.actor.{
|
||||
Actor,
|
||||
ActorLogging,
|
||||
ActorRef,
|
||||
ActorSystem,
|
||||
Props
|
||||
}
|
||||
import org.apache.pekko.pattern.ask
|
||||
import org.apache.pekko.util.Timeout
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||
import io.opentelemetry.api.trace.Tracer
|
||||
import io.opentelemetry.javaagent.testing.common.Java8BytecodeBridge
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
// ! == send-message
|
||||
object PekkoActors {
|
||||
val tracer: Tracer = GlobalOpenTelemetry.getTracer("test")
|
||||
|
||||
val system: ActorSystem = ActorSystem("helloPekko")
|
||||
|
||||
val printer: ActorRef = system.actorOf(Receiver.props, "receiverActor")
|
||||
|
||||
val howdyGreeter: ActorRef =
|
||||
system.actorOf(Greeter.props("Howdy", printer), "howdyGreeter")
|
||||
|
||||
val forwarder: ActorRef =
|
||||
system.actorOf(Forwarder.props(printer), "forwarderActor")
|
||||
val helloGreeter: ActorRef =
|
||||
system.actorOf(Greeter.props("Hello", forwarder), "helloGreeter")
|
||||
|
||||
def tracedChild(opName: String): Unit = {
|
||||
tracer.spanBuilder(opName).startSpan().end()
|
||||
}
|
||||
}
|
||||
|
||||
class PekkoActors {
|
||||
|
||||
import PekkoActors._
|
||||
import Greeter._
|
||||
|
||||
implicit val timeout: Timeout = 5.minutes
|
||||
|
||||
def basicTell(): Unit = {
|
||||
val parentSpan = tracer.spanBuilder("parent").startSpan()
|
||||
val parentScope =
|
||||
Java8BytecodeBridge.currentContext().`with`(parentSpan).makeCurrent()
|
||||
try {
|
||||
howdyGreeter ! WhoToGreet("Pekko")
|
||||
howdyGreeter ! Greet
|
||||
} finally {
|
||||
parentSpan.end()
|
||||
parentScope.close()
|
||||
}
|
||||
}
|
||||
|
||||
def basicAsk(): Unit = {
|
||||
val parentSpan = tracer.spanBuilder("parent").startSpan()
|
||||
val parentScope =
|
||||
Java8BytecodeBridge.currentContext().`with`(parentSpan).makeCurrent()
|
||||
try {
|
||||
howdyGreeter ! WhoToGreet("Pekko")
|
||||
howdyGreeter ? Greet
|
||||
} finally {
|
||||
parentSpan.end()
|
||||
parentScope.close()
|
||||
}
|
||||
}
|
||||
|
||||
def basicForward(): Unit = {
|
||||
val parentSpan = tracer.spanBuilder("parent").startSpan()
|
||||
val parentScope =
|
||||
Java8BytecodeBridge.currentContext().`with`(parentSpan).makeCurrent()
|
||||
try {
|
||||
helloGreeter ! WhoToGreet("Pekko")
|
||||
helloGreeter ? Greet
|
||||
} finally {
|
||||
parentSpan.end()
|
||||
parentScope.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object Greeter {
|
||||
def props(message: String, receiverActor: ActorRef): Props =
|
||||
Props(new Greeter(message, receiverActor))
|
||||
|
||||
final case class WhoToGreet(who: String)
|
||||
|
||||
case object Greet
|
||||
|
||||
}
|
||||
|
||||
class Greeter(message: String, receiverActor: ActorRef) extends Actor {
|
||||
|
||||
import Greeter._
|
||||
import Receiver._
|
||||
|
||||
var greeting = ""
|
||||
|
||||
def receive = {
|
||||
case WhoToGreet(who) =>
|
||||
greeting = s"$message, $who"
|
||||
case Greet =>
|
||||
receiverActor ! Greeting(greeting)
|
||||
}
|
||||
}
|
||||
|
||||
object Receiver {
|
||||
def props: Props = Props[Receiver]()
|
||||
|
||||
final case class Greeting(greeting: String)
|
||||
|
||||
}
|
||||
|
||||
class Receiver extends Actor with ActorLogging {
|
||||
|
||||
import Receiver._
|
||||
|
||||
def receive = {
|
||||
case Greeting(greeting) => {
|
||||
PekkoActors.tracedChild(greeting)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
object Forwarder {
|
||||
def props(receiverActor: ActorRef): Props =
|
||||
Props(new Forwarder(receiverActor))
|
||||
}
|
||||
|
||||
class Forwarder(receiverActor: ActorRef) extends Actor with ActorLogging {
|
||||
def receive = {
|
||||
case msg => {
|
||||
receiverActor forward msg
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
id("otel.scala-conventions")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("org.apache.pekko")
|
||||
module.set("pekko-http_2.12")
|
||||
versions.set("[1.0,)")
|
||||
assertInverse.set(true)
|
||||
extraDependency("org.apache.pekko:pekko-stream_2.12:1.0.1")
|
||||
}
|
||||
pass {
|
||||
group.set("org.apache.pekko")
|
||||
module.set("pekko-http_2.13")
|
||||
versions.set("[1.0,)")
|
||||
assertInverse.set(true)
|
||||
extraDependency("org.apache.pekko:pekko-stream_2.13:1.0.1")
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library("org.apache.pekko:pekko-http_2.12:1.0.0")
|
||||
library("org.apache.pekko:pekko-stream_2.12:1.0.1")
|
||||
|
||||
testInstrumentation(project(":instrumentation:pekko-actor-1.0:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:executors:javaagent"))
|
||||
|
||||
latestDepTestLibrary("org.apache.pekko:pekko-http_2.13:+")
|
||||
latestDepTestLibrary("org.apache.pekko:pekko-stream_2.13:+")
|
||||
}
|
||||
|
||||
tasks {
|
||||
val testStableSemconv by registering(Test::class) {
|
||||
jvmArgs("-Dotel.semconv-stability.opt-in=http")
|
||||
}
|
||||
|
||||
withType<Test>().configureEach {
|
||||
// required on jdk17
|
||||
jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED")
|
||||
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
|
||||
|
||||
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
|
||||
|
||||
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
|
||||
}
|
||||
|
||||
check {
|
||||
dependsOn(testStableSemconv)
|
||||
}
|
||||
}
|
||||
|
||||
if (findProperty("testLatestDeps") as Boolean) {
|
||||
configurations {
|
||||
// pekko artifact name is different for regular and latest tests
|
||||
testImplementation {
|
||||
exclude("org.apache.pekko", "pekko-http_2.12")
|
||||
exclude("org.apache.pekko", "pekko-stream_2.12")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
|
||||
public class PekkoHttpUtil {
|
||||
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pekko-http-1.0";
|
||||
|
||||
public static String instrumentationName() {
|
||||
return INSTRUMENTATION_NAME;
|
||||
}
|
||||
|
||||
public static List<String> requestHeader(HttpRequest httpRequest, String name) {
|
||||
return httpRequest
|
||||
.getHeader(name)
|
||||
.map(httpHeader -> Collections.singletonList(httpHeader.value()))
|
||||
.orElse(Collections.emptyList());
|
||||
}
|
||||
|
||||
public static List<String> responseHeader(HttpResponse httpResponse, String name) {
|
||||
return httpResponse
|
||||
.getHeader(name)
|
||||
.map(httpHeader -> Collections.singletonList(httpHeader.value()))
|
||||
.orElse(Collections.emptyList());
|
||||
}
|
||||
|
||||
public static String protocolName(HttpRequest request) {
|
||||
String protocol = request.protocol().value();
|
||||
if (protocol.startsWith("HTTP/")) {
|
||||
return "http";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static String protocolVersion(HttpRequest request) {
|
||||
String protocol = request.protocol().value();
|
||||
if (protocol.startsWith("HTTP/")) {
|
||||
return protocol.substring("HTTP/".length());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private PekkoHttpUtil() {}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.impl.Promise;
|
||||
import scala.runtime.AbstractFunction1;
|
||||
import scala.util.Try;
|
||||
|
||||
public final class FutureWrapper {
|
||||
|
||||
public static <T> Future<T> wrap(
|
||||
Future<T> future, ExecutionContext executionContext, Context context) {
|
||||
Promise.DefaultPromise<T> promise = new Promise.DefaultPromise<>();
|
||||
future.onComplete(
|
||||
new AbstractFunction1<Try<T>, Object>() {
|
||||
|
||||
@Override
|
||||
public Object apply(Try<T> result) {
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
return promise.complete(result);
|
||||
}
|
||||
}
|
||||
},
|
||||
executionContext);
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
private FutureWrapper() {}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
|
||||
import static io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client.PekkoHttpClientSingletons.instrumenter;
|
||||
import static io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client.PekkoHttpClientSingletons.setter;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.http.scaladsl.HttpExt;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
public class HttpExtClientInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.http.scaladsl.HttpExt");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("singleRequest")
|
||||
.and(takesArgument(0, named("org.apache.pekko.http.scaladsl.model.HttpRequest"))),
|
||||
this.getClass().getName() + "$SingleRequestAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class SingleRequestAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void methodEnter(
|
||||
@Advice.Argument(value = 0, readOnly = false) HttpRequest request,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
Context parentContext = currentContext();
|
||||
if (!instrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
}
|
||||
|
||||
context = instrumenter().start(parentContext, request);
|
||||
scope = context.makeCurrent();
|
||||
// Request is immutable, so we have to assign new value once we update headers
|
||||
request = setter().inject(request);
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void methodExit(
|
||||
@Advice.Argument(0) HttpRequest request,
|
||||
@Advice.This HttpExt thiz,
|
||||
@Advice.Return(readOnly = false) Future<HttpResponse> responseFuture,
|
||||
@Advice.Thrown Throwable throwable,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
if (scope == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
scope.close();
|
||||
if (throwable == null) {
|
||||
responseFuture.onComplete(
|
||||
new OnCompleteHandler(context, request), thiz.system().dispatcher());
|
||||
} else {
|
||||
instrumenter().end(context, request, null, throwable);
|
||||
}
|
||||
if (responseFuture != null) {
|
||||
responseFuture =
|
||||
FutureWrapper.wrap(responseFuture, thiz.system().dispatcher(), currentContext());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.propagation.ContextPropagators;
|
||||
import io.opentelemetry.context.propagation.TextMapSetter;
|
||||
import org.apache.pekko.http.javadsl.model.headers.RawHeader;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
|
||||
public class HttpHeaderSetter implements TextMapSetter<HttpHeaderSetter.PekkoHttpHeaders> {
|
||||
|
||||
private final ContextPropagators contextPropagators;
|
||||
|
||||
public HttpHeaderSetter(ContextPropagators contextPropagators) {
|
||||
this.contextPropagators = contextPropagators;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(PekkoHttpHeaders carrier, String key, String value) {
|
||||
HttpRequest request = carrier.getRequest();
|
||||
if (request != null) {
|
||||
// It looks like this cast is only needed in Java, Scala would have figured it out
|
||||
carrier.setRequest(
|
||||
(HttpRequest) request.removeHeader(key).addHeader(RawHeader.create(key, value)));
|
||||
}
|
||||
}
|
||||
|
||||
public HttpRequest inject(HttpRequest original) {
|
||||
PekkoHttpHeaders carrier = new PekkoHttpHeaders(original);
|
||||
contextPropagators.getTextMapPropagator().inject(Context.current(), carrier, this);
|
||||
return carrier.getRequest();
|
||||
}
|
||||
|
||||
static class PekkoHttpHeaders {
|
||||
private HttpRequest request;
|
||||
|
||||
public PekkoHttpHeaders(HttpRequest request) {
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
public HttpRequest getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
||||
public void setRequest(HttpRequest request) {
|
||||
this.request = request;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client.PekkoHttpClientSingletons.instrumenter;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
import scala.runtime.AbstractFunction1;
|
||||
import scala.util.Try;
|
||||
|
||||
public class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
|
||||
private final Context context;
|
||||
private final HttpRequest request;
|
||||
|
||||
public OnCompleteHandler(Context context, HttpRequest request) {
|
||||
this.context = context;
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void apply(Try<HttpResponse> result) {
|
||||
if (result.isSuccess()) {
|
||||
instrumenter().end(context, request, result.get(), null);
|
||||
} else {
|
||||
instrumenter().end(context, request, null, result.failed().get());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesGetter;
|
||||
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.PekkoHttpUtil;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
|
||||
class PekkoHttpClientAttributesGetter
|
||||
implements HttpClientAttributesGetter<HttpRequest, HttpResponse> {
|
||||
|
||||
@Override
|
||||
public String getUrlFull(HttpRequest httpRequest) {
|
||||
return httpRequest.uri().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHttpRequestMethod(HttpRequest httpRequest) {
|
||||
return httpRequest.method().value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getHttpRequestHeader(HttpRequest httpRequest, String name) {
|
||||
return PekkoHttpUtil.requestHeader(httpRequest, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getHttpResponseStatusCode(
|
||||
HttpRequest httpRequest, HttpResponse httpResponse, @Nullable Throwable error) {
|
||||
return httpResponse.status().intValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getHttpResponseHeader(
|
||||
HttpRequest httpRequest, HttpResponse httpResponse, String name) {
|
||||
return PekkoHttpUtil.responseHeader(httpResponse, name);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getNetworkProtocolName(
|
||||
HttpRequest httpRequest, @Nullable HttpResponse httpResponse) {
|
||||
return PekkoHttpUtil.protocolName(httpRequest);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getNetworkProtocolVersion(
|
||||
HttpRequest httpRequest, @Nullable HttpResponse httpResponse) {
|
||||
return PekkoHttpUtil.protocolVersion(httpRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getServerAddress(HttpRequest httpRequest) {
|
||||
return httpRequest.uri().authority().host().address();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getServerPort(HttpRequest httpRequest) {
|
||||
return httpRequest.uri().authority().port();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class PekkoHttpClientInstrumentationModule extends InstrumentationModule {
|
||||
public PekkoHttpClientInstrumentationModule() {
|
||||
super("pekko-http", "pekko-http-1.0", "pekko-http-client");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(new HttpExtClientInstrumentation(), new PoolMasterActorInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientExperimentalMetrics;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.PeerServiceAttributesExtractor;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
|
||||
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.PekkoHttpUtil;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
|
||||
public class PekkoHttpClientSingletons {
|
||||
|
||||
private static final HttpHeaderSetter SETTER;
|
||||
private static final Instrumenter<HttpRequest, HttpResponse> INSTRUMENTER;
|
||||
|
||||
static {
|
||||
SETTER = new HttpHeaderSetter(GlobalOpenTelemetry.getPropagators());
|
||||
PekkoHttpClientAttributesGetter httpAttributesGetter = new PekkoHttpClientAttributesGetter();
|
||||
InstrumenterBuilder<HttpRequest, HttpResponse> builder =
|
||||
Instrumenter.<HttpRequest, HttpResponse>builder(
|
||||
GlobalOpenTelemetry.get(),
|
||||
PekkoHttpUtil.instrumentationName(),
|
||||
HttpSpanNameExtractor.create(httpAttributesGetter))
|
||||
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesGetter))
|
||||
.addAttributesExtractor(
|
||||
HttpClientAttributesExtractor.builder(httpAttributesGetter)
|
||||
.setCapturedRequestHeaders(CommonConfig.get().getClientRequestHeaders())
|
||||
.setCapturedResponseHeaders(CommonConfig.get().getClientResponseHeaders())
|
||||
.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods())
|
||||
.build())
|
||||
.addAttributesExtractor(
|
||||
PeerServiceAttributesExtractor.create(
|
||||
httpAttributesGetter, CommonConfig.get().getPeerServiceMapping()))
|
||||
.addOperationMetrics(HttpClientMetrics.get());
|
||||
if (CommonConfig.get().shouldEmitExperimentalHttpClientMetrics()) {
|
||||
builder.addOperationMetrics(HttpClientExperimentalMetrics.get());
|
||||
}
|
||||
INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysClient());
|
||||
}
|
||||
|
||||
public static Instrumenter<HttpRequest, HttpResponse> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
public static HttpHeaderSetter setter() {
|
||||
return SETTER;
|
||||
}
|
||||
|
||||
private PekkoHttpClientSingletons() {}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.client;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
public class PoolMasterActorInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.http.impl.engine.client.PoolMasterActor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
// scala compiler mangles method names
|
||||
transformer.applyAdviceToMethod(
|
||||
named("org$apache$pekko$http$impl$engine$client$PoolMasterActor$$startPoolInterface")
|
||||
.or(
|
||||
named(
|
||||
"org$apache$pekko$http$impl$engine$client$PoolMasterActor$$startPoolInterfaceActor")),
|
||||
ClearContextAdvice.class.getName());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ClearContextAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope enter() {
|
||||
return Java8BytecodeBridge.rootContext().makeCurrent();
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.stream.impl.fusing.GraphInterpreter;
|
||||
|
||||
public class GraphInterpreterInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.stream.impl.fusing.GraphInterpreter");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("processPush"), GraphInterpreterInstrumentation.class.getName() + "$PushAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class PushAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static Scope onEnter(@Advice.Argument(0) GraphInterpreter.Connection connection) {
|
||||
// processPush is called when execution passes to application or server. Here we propagate the
|
||||
// context to the application code.
|
||||
Context context = PekkoFlowWrapper.getContext(connection.outHandler());
|
||||
if (context != null) {
|
||||
return context.makeCurrent();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void exit(@Advice.Enter Scope scope) {
|
||||
if (scope != null) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
import org.apache.pekko.stream.scaladsl.Flow;
|
||||
|
||||
public class HttpExtServerInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("org.apache.pekko.http.scaladsl.HttpExt");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("bindAndHandle")
|
||||
.and(takesArgument(0, named("org.apache.pekko.stream.scaladsl.Flow"))),
|
||||
this.getClass().getName() + "$PekkoBindAndHandleAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class PekkoBindAndHandleAdvice {
|
||||
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void wrapHandler(
|
||||
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
|
||||
handler = PekkoFlowWrapper.wrap(handler);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import org.apache.pekko.http.javadsl.model.HttpHeader;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
import org.apache.pekko.stream.Attributes;
|
||||
import org.apache.pekko.stream.BidiShape;
|
||||
import org.apache.pekko.stream.Inlet;
|
||||
import org.apache.pekko.stream.Outlet;
|
||||
import org.apache.pekko.stream.scaladsl.Flow;
|
||||
import org.apache.pekko.stream.stage.AbstractInHandler;
|
||||
import org.apache.pekko.stream.stage.AbstractOutHandler;
|
||||
import org.apache.pekko.stream.stage.GraphStage;
|
||||
import org.apache.pekko.stream.stage.GraphStageLogic;
|
||||
import org.apache.pekko.stream.stage.OutHandler;
|
||||
|
||||
public class PekkoFlowWrapper
|
||||
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
|
||||
private final Inlet<HttpRequest> requestIn = Inlet.create("otel.requestIn");
|
||||
private final Outlet<HttpRequest> requestOut = Outlet.create("otel.requestOut");
|
||||
private final Inlet<HttpResponse> responseIn = Inlet.create("otel.responseIn");
|
||||
private final Outlet<HttpResponse> responseOut = Outlet.create("otel.responseOut");
|
||||
|
||||
private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
|
||||
BidiShape.of(responseIn, responseOut, requestIn, requestOut);
|
||||
|
||||
public static Flow<HttpRequest, HttpResponse, ?> wrap(
|
||||
Flow<HttpRequest, HttpResponse, ?> handler) {
|
||||
return handler.join(new PekkoFlowWrapper());
|
||||
}
|
||||
|
||||
public static Context getContext(OutHandler outHandler) {
|
||||
if (outHandler instanceof TracingLogic.ApplicationOutHandler) {
|
||||
// We have multiple requests here only when requests are pipelined on the same connection.
|
||||
// It appears that these requests are processed one by one so processing next request won't
|
||||
// be started before the first one has returned a response, because of this the first request
|
||||
// in the queue is always the one that is currently being processed.
|
||||
TracingRequest request =
|
||||
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().peek();
|
||||
if (request != null) {
|
||||
return request.context;
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
|
||||
return shape;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphStageLogic createLogic(Attributes attributes) {
|
||||
return new TracingLogic();
|
||||
}
|
||||
|
||||
private class TracingLogic extends GraphStageLogic {
|
||||
private final Deque<TracingRequest> requests = new ArrayDeque<>();
|
||||
|
||||
public TracingLogic() {
|
||||
super(shape);
|
||||
|
||||
// server pulls response, pass response from user code to server
|
||||
setHandler(
|
||||
responseOut,
|
||||
new AbstractOutHandler() {
|
||||
@Override
|
||||
public void onPull() {
|
||||
pull(responseIn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDownstreamFinish(Throwable cause) {
|
||||
cancel(responseIn);
|
||||
}
|
||||
});
|
||||
|
||||
// user code pulls request, pass request from server to user code
|
||||
setHandler(
|
||||
requestOut,
|
||||
new ApplicationOutHandler() {
|
||||
@Override
|
||||
public void onPull() {
|
||||
pull(requestIn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDownstreamFinish(Throwable cause) {
|
||||
// Invoked on errors. Don't complete this stage to allow error-capturing
|
||||
cancel(requestIn);
|
||||
}
|
||||
});
|
||||
|
||||
// new request from server
|
||||
setHandler(
|
||||
requestIn,
|
||||
new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() {
|
||||
HttpRequest request = grab(requestIn);
|
||||
|
||||
TracingRequest tracingRequest = TracingRequest.EMPTY;
|
||||
Context parentContext = currentContext();
|
||||
if (PekkoHttpServerSingletons.instrumenter().shouldStart(parentContext, request)) {
|
||||
Context context =
|
||||
PekkoHttpServerSingletons.instrumenter().start(parentContext, request);
|
||||
tracingRequest = new TracingRequest(context, request);
|
||||
}
|
||||
// event if span wasn't started we need to push TracingRequest to match response
|
||||
// with request
|
||||
requests.push(tracingRequest);
|
||||
|
||||
push(requestOut, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFinish() {
|
||||
complete(requestOut);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFailure(Throwable exception) {
|
||||
fail(requestOut, exception);
|
||||
}
|
||||
});
|
||||
|
||||
// response from user code
|
||||
setHandler(
|
||||
responseIn,
|
||||
new AbstractInHandler() {
|
||||
@Override
|
||||
public void onPush() {
|
||||
HttpResponse response = grab(responseIn);
|
||||
|
||||
TracingRequest tracingRequest = requests.poll();
|
||||
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
|
||||
// pekko response is immutable so the customizer just captures the added headers
|
||||
PekkoHttpResponseMutator responseMutator = new PekkoHttpResponseMutator();
|
||||
HttpServerResponseCustomizerHolder.getCustomizer()
|
||||
.customize(tracingRequest.context, response, responseMutator);
|
||||
// build a new response with the added headers
|
||||
List<HttpHeader> headers = responseMutator.getHeaders();
|
||||
if (!headers.isEmpty()) {
|
||||
response = (HttpResponse) response.addHeaders(headers);
|
||||
}
|
||||
|
||||
PekkoHttpServerSingletons.instrumenter()
|
||||
.end(tracingRequest.context, tracingRequest.request, response, null);
|
||||
}
|
||||
push(responseOut, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFailure(Throwable exception) {
|
||||
TracingRequest tracingRequest;
|
||||
while ((tracingRequest = requests.poll()) != null) {
|
||||
if (tracingRequest == TracingRequest.EMPTY) {
|
||||
continue;
|
||||
}
|
||||
PekkoHttpServerSingletons.instrumenter()
|
||||
.end(
|
||||
tracingRequest.context,
|
||||
tracingRequest.request,
|
||||
PekkoHttpServerSingletons.errorResponse(),
|
||||
exception);
|
||||
}
|
||||
|
||||
fail(responseOut, exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpstreamFinish() {
|
||||
completeStage();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
abstract class ApplicationOutHandler extends AbstractOutHandler {
|
||||
Deque<TracingRequest> getRequests() {
|
||||
return requests;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TracingRequest {
|
||||
static final TracingRequest EMPTY = new TracingRequest(null, null);
|
||||
final Context context;
|
||||
final HttpRequest request;
|
||||
|
||||
TracingRequest(Context context, HttpRequest request) {
|
||||
this.context = context;
|
||||
this.request = request;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseMutator;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.pekko.http.javadsl.model.HttpHeader;
|
||||
import org.apache.pekko.http.javadsl.model.HttpResponse;
|
||||
import org.apache.pekko.http.javadsl.model.headers.RawHeader;
|
||||
|
||||
final class PekkoHttpResponseMutator implements HttpServerResponseMutator<HttpResponse> {
|
||||
|
||||
private final List<HttpHeader> headers = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public void appendHeader(HttpResponse response, String name, String value) {
|
||||
headers.add(RawHeader.create(name, value));
|
||||
}
|
||||
|
||||
List<HttpHeader> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerAttributesGetter;
|
||||
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.PekkoHttpUtil;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
import org.apache.pekko.http.scaladsl.model.Uri;
|
||||
import scala.Option;
|
||||
|
||||
class PekkoHttpServerAttributesGetter
|
||||
implements HttpServerAttributesGetter<HttpRequest, HttpResponse> {
|
||||
|
||||
@Override
|
||||
public String getHttpRequestMethod(HttpRequest request) {
|
||||
return request.method().value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getHttpRequestHeader(HttpRequest request, String name) {
|
||||
return PekkoHttpUtil.requestHeader(request, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getHttpResponseStatusCode(
|
||||
HttpRequest request, HttpResponse httpResponse, @Nullable Throwable error) {
|
||||
return httpResponse.status().intValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getHttpResponseHeader(
|
||||
HttpRequest request, HttpResponse httpResponse, String name) {
|
||||
return PekkoHttpUtil.responseHeader(httpResponse, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUrlScheme(HttpRequest request) {
|
||||
return request.uri().scheme();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUrlPath(HttpRequest request) {
|
||||
return request.uri().path().toString();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getUrlQuery(HttpRequest request) {
|
||||
Option<String> queryString = request.uri().rawQueryString();
|
||||
return queryString.isDefined() ? queryString.get() : null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getNetworkProtocolName(HttpRequest request, @Nullable HttpResponse httpResponse) {
|
||||
return PekkoHttpUtil.protocolName(request);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getNetworkProtocolVersion(
|
||||
HttpRequest request, @Nullable HttpResponse httpResponse) {
|
||||
return PekkoHttpUtil.protocolVersion(request);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getServerAddress(HttpRequest request) {
|
||||
Uri.Host host = request.uri().authority().host();
|
||||
return host.isEmpty() ? null : host.address();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getServerPort(HttpRequest request) {
|
||||
return request.uri().authority().port();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.pekko.http.javadsl.model.HttpHeader;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
|
||||
enum PekkoHttpServerHeaders implements TextMapGetter<HttpRequest> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public Iterable<String> keys(HttpRequest httpRequest) {
|
||||
return StreamSupport.stream(httpRequest.getHeaders().spliterator(), false)
|
||||
.map(HttpHeader::lowercaseName)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String get(HttpRequest carrier, String key) {
|
||||
Optional<HttpHeader> header = carrier.getHeader(key);
|
||||
return header.map(HttpHeader::value).orElse(null);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class PekkoHttpServerInstrumentationModule extends InstrumentationModule {
|
||||
public PekkoHttpServerInstrumentationModule() {
|
||||
super("pekko-http", "pekko-http-1.0", "pekko-http-server");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// in GraphInterpreterInstrumentation we instrument a class that belongs to pekko-streams, make
|
||||
// sure this runs only when pekko-http is present to avoid muzzle failures
|
||||
return hasClassesNamed("org.apache.pekko.http.scaladsl.HttpExt");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(new HttpExtServerInstrumentation(), new GraphInterpreterInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerExperimentalMetrics;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerMetrics;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpServerRoute;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
|
||||
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.PekkoHttpUtil;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpRequest;
|
||||
import org.apache.pekko.http.scaladsl.model.HttpResponse;
|
||||
|
||||
public final class PekkoHttpServerSingletons {
|
||||
|
||||
private static final Instrumenter<HttpRequest, HttpResponse> INSTRUMENTER;
|
||||
|
||||
static {
|
||||
PekkoHttpServerAttributesGetter httpAttributesGetter = new PekkoHttpServerAttributesGetter();
|
||||
InstrumenterBuilder<HttpRequest, HttpResponse> builder =
|
||||
Instrumenter.<HttpRequest, HttpResponse>builder(
|
||||
GlobalOpenTelemetry.get(),
|
||||
PekkoHttpUtil.instrumentationName(),
|
||||
HttpSpanNameExtractor.create(httpAttributesGetter))
|
||||
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesGetter))
|
||||
.addAttributesExtractor(
|
||||
HttpServerAttributesExtractor.builder(httpAttributesGetter)
|
||||
.setCapturedRequestHeaders(CommonConfig.get().getServerRequestHeaders())
|
||||
.setCapturedResponseHeaders(CommonConfig.get().getServerResponseHeaders())
|
||||
.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods())
|
||||
.build())
|
||||
.addOperationMetrics(HttpServerMetrics.get())
|
||||
.addContextCustomizer(
|
||||
HttpServerRoute.builder(httpAttributesGetter)
|
||||
.setKnownMethods(CommonConfig.get().getKnownHttpRequestMethods())
|
||||
.build());
|
||||
if (CommonConfig.get().shouldEmitExperimentalHttpServerMetrics()) {
|
||||
builder.addOperationMetrics(HttpServerExperimentalMetrics.get());
|
||||
}
|
||||
INSTRUMENTER = builder.buildServerInstrumenter(PekkoHttpServerHeaders.INSTANCE);
|
||||
}
|
||||
|
||||
public static Instrumenter<HttpRequest, HttpResponse> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
public static HttpResponse errorResponse() {
|
||||
return (HttpResponse) HttpResponse.create().withStatus(500);
|
||||
}
|
||||
|
||||
private PekkoHttpServerSingletons() {}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesBuilder;
|
||||
import io.opentelemetry.javaagent.extension.ignore.IgnoredTypesConfigurer;
|
||||
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
|
||||
|
||||
@AutoService(IgnoredTypesConfigurer.class)
|
||||
public class PekkoServerIgnoredTypesConfigurer implements IgnoredTypesConfigurer {
|
||||
|
||||
@Override
|
||||
public void configure(IgnoredTypesBuilder builder, ConfigProperties config) {
|
||||
// in PekkoHttpServerInstrumentationTestAsync http pipeline test sending this message trigger
|
||||
// processing next request, we don't want to propagate context there
|
||||
builder.ignoreTaskClass("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter$AsyncInput");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
pekko.http {
|
||||
host-connection-pool {
|
||||
// Limit maximum http backoff for tests
|
||||
max-connection-backoff = 100ms
|
||||
max-open-requests = 1024
|
||||
max-retries = 0
|
||||
client {
|
||||
connecting-timeout = 5s
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
AbstractHttpServerTest,
|
||||
HttpServerTestOptions,
|
||||
ServerEndpoint
|
||||
}
|
||||
|
||||
import java.util
|
||||
import java.util.Collections
|
||||
import java.util.function.{Function, Predicate}
|
||||
|
||||
abstract class AbstractHttpServerInstrumentationTest
|
||||
extends AbstractHttpServerTest[Object] {
|
||||
|
||||
override protected def configure(
|
||||
options: HttpServerTestOptions
|
||||
): Unit = {
|
||||
options.setTestCaptureHttpHeaders(false)
|
||||
options.setHttpAttributes(
|
||||
new Function[ServerEndpoint, util.Set[AttributeKey[_]]] {
|
||||
override def apply(v1: ServerEndpoint): util.Set[AttributeKey[_]] =
|
||||
Collections.emptySet()
|
||||
}
|
||||
)
|
||||
options.setHasResponseCustomizer(
|
||||
new Predicate[ServerEndpoint] {
|
||||
override def test(t: ServerEndpoint): Boolean =
|
||||
t != ServerEndpoint.EXCEPTION
|
||||
}
|
||||
)
|
||||
// instrumentation does not create a span at all
|
||||
options.disableTestNonStandardHttpMethod
|
||||
}
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.dispatch.ExecutionContexts
|
||||
import org.apache.pekko.http.scaladsl.Http
|
||||
import org.apache.pekko.http.scaladsl.model._
|
||||
import org.apache.pekko.http.javadsl.model.HttpHeader
|
||||
import org.apache.pekko.http.scaladsl.settings.ClientConnectionSettings
|
||||
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
|
||||
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings
|
||||
import org.apache.pekko.stream.ActorMaterializer
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
AbstractHttpClientTest,
|
||||
HttpClientInstrumentationExtension,
|
||||
HttpClientResult,
|
||||
HttpClientTestOptions,
|
||||
SingleConnection
|
||||
}
|
||||
|
||||
import java.net.URI
|
||||
import java.util
|
||||
import java.util.concurrent.Executor
|
||||
import org.junit.jupiter.api.extension.RegisterExtension
|
||||
|
||||
import java.util.function.BiFunction
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
import scala.util.{Failure, Success}
|
||||
|
||||
class PekkoHttpClientInstrumentationTest
|
||||
extends AbstractHttpClientTest[HttpRequest] {
|
||||
|
||||
@RegisterExtension val extension: InstrumentationExtension =
|
||||
HttpClientInstrumentationExtension.forAgent()
|
||||
|
||||
val system: ActorSystem = ActorSystem.create()
|
||||
implicit val materializer: ActorMaterializer =
|
||||
ActorMaterializer.create(system)
|
||||
|
||||
override def buildRequest(
|
||||
method: String,
|
||||
uri: URI,
|
||||
h: util.Map[String, String]
|
||||
): HttpRequest =
|
||||
HttpRequest(HttpMethods.getForKey(method).get)
|
||||
.withUri(uri.toString)
|
||||
.addHeaders(
|
||||
h.entrySet()
|
||||
.asScala
|
||||
.map(e => RawHeader(e.getKey, e.getValue): HttpHeader)
|
||||
.asJava
|
||||
)
|
||||
|
||||
override def sendRequest(
|
||||
request: HttpRequest,
|
||||
method: String,
|
||||
uri: URI,
|
||||
headers: util.Map[String, String]
|
||||
): Int = {
|
||||
val settings = ConnectionPoolSettings(system)
|
||||
.withConnectionSettings(
|
||||
ClientConnectionSettings(system)
|
||||
.withConnectingTimeout(
|
||||
FiniteDuration(
|
||||
AbstractHttpClientTest.CONNECTION_TIMEOUT.toMillis,
|
||||
MILLISECONDS
|
||||
)
|
||||
)
|
||||
.withIdleTimeout(
|
||||
FiniteDuration(
|
||||
AbstractHttpClientTest.READ_TIMEOUT.toMillis,
|
||||
MILLISECONDS
|
||||
)
|
||||
)
|
||||
)
|
||||
val response = Await.result(
|
||||
Http.get(system).singleRequest(request, settings = settings),
|
||||
10 seconds
|
||||
)
|
||||
response.discardEntityBytes(materializer)
|
||||
response.status.intValue()
|
||||
}
|
||||
|
||||
override def sendRequestWithCallback(
|
||||
request: HttpRequest,
|
||||
method: String,
|
||||
uri: URI,
|
||||
headers: util.Map[String, String],
|
||||
requestResult: HttpClientResult
|
||||
): Unit = {
|
||||
implicit val ec: ExecutionContext =
|
||||
ExecutionContexts.fromExecutor(new Executor {
|
||||
override def execute(command: Runnable): Unit = command.run()
|
||||
})
|
||||
Http
|
||||
.get(system)
|
||||
.singleRequest(request)
|
||||
.onComplete {
|
||||
case Success(response: HttpResponse) => {
|
||||
response.discardEntityBytes(materializer)
|
||||
requestResult.complete(response.status.intValue())
|
||||
}
|
||||
case Failure(error) => requestResult.complete(error)
|
||||
}
|
||||
}
|
||||
|
||||
override protected def configure(
|
||||
options: HttpClientTestOptions.Builder
|
||||
): Unit = {
|
||||
options.disableTestRedirects()
|
||||
options.disableTestNonStandardHttpMethod()
|
||||
// singleConnection test would require instrumentation to support requests made through pools
|
||||
// (newHostConnectionPool, superPool, etc), which is currently not supported.
|
||||
options.setSingleConnectionFactory(
|
||||
new BiFunction[String, Integer, SingleConnection] {
|
||||
override def apply(t: String, u: Integer): SingleConnection = null
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
HttpServerInstrumentationExtension,
|
||||
HttpServerTestOptions
|
||||
}
|
||||
import org.junit.jupiter.api.extension.RegisterExtension
|
||||
|
||||
class PekkoHttpServerInstrumentationTest
|
||||
extends AbstractHttpServerInstrumentationTest {
|
||||
@RegisterExtension val extension: InstrumentationExtension =
|
||||
HttpServerInstrumentationExtension.forAgent()
|
||||
|
||||
override protected def setupServer(): AnyRef = {
|
||||
PekkoHttpTestWebServer.start(port)
|
||||
null
|
||||
}
|
||||
|
||||
override protected def stopServer(server: Object): Unit =
|
||||
PekkoHttpTestWebServer.stop()
|
||||
|
||||
override protected def configure(
|
||||
options: HttpServerTestOptions
|
||||
): Unit = {
|
||||
super.configure(options)
|
||||
// exception doesn't propagate
|
||||
options.setTestException(false)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
HttpServerInstrumentationExtension,
|
||||
HttpServerTestOptions
|
||||
}
|
||||
import org.junit.jupiter.api.extension.RegisterExtension
|
||||
|
||||
class PekkoHttpServerInstrumentationTestAsync
|
||||
extends AbstractHttpServerInstrumentationTest {
|
||||
|
||||
@RegisterExtension val extension: InstrumentationExtension =
|
||||
HttpServerInstrumentationExtension.forAgent()
|
||||
|
||||
override protected def setupServer(): AnyRef = {
|
||||
PekkoHttpTestAsyncWebServer.start(port)
|
||||
null
|
||||
}
|
||||
|
||||
override protected def stopServer(server: Object): Unit =
|
||||
PekkoHttpTestAsyncWebServer.stop()
|
||||
|
||||
override protected def configure(
|
||||
options: HttpServerTestOptions
|
||||
): Unit = {
|
||||
super.configure(options)
|
||||
options.setTestHttpPipelining(false)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
HttpServerInstrumentationExtension,
|
||||
HttpServerTestOptions
|
||||
}
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
|
||||
import org.junit.jupiter.api.extension.RegisterExtension
|
||||
|
||||
class PekkoHttpServerInstrumentationTestSync
|
||||
extends AbstractHttpServerInstrumentationTest {
|
||||
|
||||
@RegisterExtension val extension: InstrumentationExtension =
|
||||
HttpServerInstrumentationExtension.forAgent()
|
||||
|
||||
override protected def setupServer(): AnyRef = {
|
||||
PekkoHttpTestSyncWebServer.start(port)
|
||||
null
|
||||
}
|
||||
|
||||
override protected def stopServer(server: Object): Unit =
|
||||
PekkoHttpTestSyncWebServer.stop()
|
||||
|
||||
override protected def configure(
|
||||
options: HttpServerTestOptions
|
||||
): Unit = {
|
||||
super.configure(options)
|
||||
// FIXME: latest deps does not fill http.status_code
|
||||
options.setTestException(!java.lang.Boolean.getBoolean("testLatestDeps"))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.http.scaladsl.Http
|
||||
import org.apache.pekko.http.scaladsl.Http.ServerBinding
|
||||
import org.apache.pekko.http.scaladsl.model.HttpMethods.GET
|
||||
import org.apache.pekko.http.scaladsl.model._
|
||||
import org.apache.pekko.stream.ActorMaterializer
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
AbstractHttpServerTest,
|
||||
ServerEndpoint
|
||||
}
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
|
||||
|
||||
import java.util.function.Supplier
|
||||
import scala.concurrent.{Await, ExecutionContextExecutor, Future}
|
||||
|
||||
object PekkoHttpTestAsyncWebServer {
|
||||
implicit val system: ActorSystem = ActorSystem("my-system")
|
||||
implicit val materializer: ActorMaterializer = ActorMaterializer()
|
||||
// needed for the future flatMap/onComplete in the end
|
||||
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
|
||||
val asyncHandler: HttpRequest => Future[HttpResponse] = {
|
||||
case HttpRequest(GET, uri: Uri, _, _, _) =>
|
||||
Future {
|
||||
val endpoint = ServerEndpoint.forPath(uri.path.toString())
|
||||
AbstractHttpServerTest.controller(
|
||||
endpoint,
|
||||
new Supplier[HttpResponse] {
|
||||
def get(): HttpResponse = {
|
||||
val resp = HttpResponse(status =
|
||||
endpoint.getStatus
|
||||
) // .withHeaders(headers.Type)resp.contentType = "text/plain"
|
||||
endpoint match {
|
||||
case SUCCESS => resp.withEntity(endpoint.getBody)
|
||||
case INDEXED_CHILD =>
|
||||
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
|
||||
override def getParameter(name: String): String =
|
||||
uri.query().get(name).orNull
|
||||
})
|
||||
resp.withEntity("")
|
||||
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
|
||||
case REDIRECT =>
|
||||
resp.withHeaders(headers.Location(endpoint.getBody))
|
||||
case ERROR => resp.withEntity(endpoint.getBody)
|
||||
case EXCEPTION => throw new Exception(endpoint.getBody)
|
||||
case _ =>
|
||||
HttpResponse(status = NOT_FOUND.getStatus)
|
||||
.withEntity(NOT_FOUND.getBody)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private var binding: ServerBinding = _
|
||||
|
||||
def start(port: Int): Unit = synchronized {
|
||||
if (null == binding) {
|
||||
import scala.concurrent.duration._
|
||||
binding = Await.result(
|
||||
Http().bindAndHandleAsync(asyncHandler, "localhost", port),
|
||||
10.seconds
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def stop(): Unit = synchronized {
|
||||
if (null != binding) {
|
||||
binding.unbind()
|
||||
system.terminate()
|
||||
binding = null
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.http.scaladsl.Http
|
||||
import org.apache.pekko.http.scaladsl.Http.ServerBinding
|
||||
import org.apache.pekko.http.scaladsl.model.HttpMethods.GET
|
||||
import org.apache.pekko.http.scaladsl.model._
|
||||
import org.apache.pekko.stream.ActorMaterializer
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
AbstractHttpServerTest,
|
||||
ServerEndpoint
|
||||
}
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
|
||||
|
||||
import java.util.function.Supplier
|
||||
import scala.concurrent.Await
|
||||
|
||||
object PekkoHttpTestSyncWebServer {
|
||||
implicit val system = ActorSystem("my-system")
|
||||
implicit val materializer = ActorMaterializer()
|
||||
// needed for the future flatMap/onComplete in the end
|
||||
implicit val executionContext = system.dispatcher
|
||||
val syncHandler: HttpRequest => HttpResponse = {
|
||||
case HttpRequest(GET, uri: Uri, _, _, _) => {
|
||||
val endpoint = ServerEndpoint.forPath(uri.path.toString())
|
||||
AbstractHttpServerTest.controller(
|
||||
endpoint,
|
||||
new Supplier[HttpResponse] {
|
||||
def get(): HttpResponse = {
|
||||
val resp = HttpResponse(status = endpoint.getStatus)
|
||||
endpoint match {
|
||||
case SUCCESS => resp.withEntity(endpoint.getBody)
|
||||
case INDEXED_CHILD =>
|
||||
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
|
||||
override def getParameter(name: String): String =
|
||||
uri.query().get(name).orNull
|
||||
})
|
||||
resp.withEntity("")
|
||||
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
|
||||
case REDIRECT =>
|
||||
resp.withHeaders(headers.Location(endpoint.getBody))
|
||||
case ERROR => resp.withEntity(endpoint.getBody)
|
||||
case EXCEPTION => throw new Exception(endpoint.getBody)
|
||||
case _ =>
|
||||
HttpResponse(status = NOT_FOUND.getStatus)
|
||||
.withEntity(NOT_FOUND.getBody)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private var binding: ServerBinding = null
|
||||
|
||||
def start(port: Int): Unit = synchronized {
|
||||
if (null == binding) {
|
||||
import scala.concurrent.duration._
|
||||
binding = Await.result(
|
||||
Http().bindAndHandleSync(syncHandler, "localhost", port),
|
||||
10.seconds
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
def stop(): Unit = synchronized {
|
||||
if (null != binding) {
|
||||
binding.unbind()
|
||||
system.terminate()
|
||||
binding = null
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0
|
||||
|
||||
import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.http.scaladsl.Http
|
||||
import org.apache.pekko.http.scaladsl.Http.ServerBinding
|
||||
import org.apache.pekko.http.scaladsl.model._
|
||||
import org.apache.pekko.http.scaladsl.server.Directives._
|
||||
import org.apache.pekko.http.scaladsl.server.ExceptionHandler
|
||||
import org.apache.pekko.stream.ActorMaterializer
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||
AbstractHttpServerTest,
|
||||
ServerEndpoint
|
||||
}
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
|
||||
|
||||
import java.util.function.Supplier
|
||||
import scala.concurrent.Await
|
||||
|
||||
object PekkoHttpTestWebServer {
|
||||
implicit val system = ActorSystem("my-system")
|
||||
implicit val materializer = ActorMaterializer()
|
||||
// needed for the future flatMap/onComplete in the end
|
||||
implicit val executionContext = system.dispatcher
|
||||
|
||||
val exceptionHandler = ExceptionHandler { case ex: Exception =>
|
||||
complete(
|
||||
HttpResponse(status = EXCEPTION.getStatus).withEntity(ex.getMessage)
|
||||
)
|
||||
}
|
||||
|
||||
val route = handleExceptions(exceptionHandler) {
|
||||
extractUri { uri =>
|
||||
val endpoint = ServerEndpoint.forPath(uri.path.toString())
|
||||
complete {
|
||||
AbstractHttpServerTest.controller(
|
||||
endpoint,
|
||||
new Supplier[HttpResponse] {
|
||||
def get(): HttpResponse = {
|
||||
val resp = HttpResponse(status = endpoint.getStatus)
|
||||
endpoint match {
|
||||
case SUCCESS => resp.withEntity(endpoint.getBody)
|
||||
case INDEXED_CHILD =>
|
||||
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
|
||||
override def getParameter(name: String): String =
|
||||
uri.query().get(name).orNull
|
||||
})
|
||||
resp.withEntity("")
|
||||
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
|
||||
case REDIRECT =>
|
||||
resp.withHeaders(headers.Location(endpoint.getBody))
|
||||
case ERROR => resp.withEntity(endpoint.getBody)
|
||||
case EXCEPTION => throw new Exception(endpoint.getBody)
|
||||
case _ =>
|
||||
HttpResponse(status = NOT_FOUND.getStatus)
|
||||
.withEntity(NOT_FOUND.getBody)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private var binding: ServerBinding = null
|
||||
|
||||
def start(port: Int): Unit = synchronized {
|
||||
if (null == binding) {
|
||||
import scala.concurrent.duration._
|
||||
binding =
|
||||
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
def stop(): Unit = synchronized {
|
||||
if (null != binding) {
|
||||
binding.unbind()
|
||||
system.terminate()
|
||||
binding = null
|
||||
}
|
||||
}
|
||||
}
|
|
@ -409,6 +409,8 @@ include(":instrumentation:oshi:javaagent")
|
|||
include(":instrumentation:oshi:library")
|
||||
include(":instrumentation:oshi:testing")
|
||||
include(":instrumentation:payara:javaagent")
|
||||
include(":instrumentation:pekko-actor-1.0:javaagent")
|
||||
include(":instrumentation:pekko-http-1.0:javaagent")
|
||||
include(":instrumentation:play:play-mvc:play-mvc-2.4:javaagent")
|
||||
include(":instrumentation:play:play-mvc:play-mvc-2.6:javaagent")
|
||||
include(":instrumentation:play:play-ws:play-ws-1.0:javaagent")
|
||||
|
|
Loading…
Reference in New Issue