Instrument akka-http bindAndHandle (#8174)

Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8143
Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/6081
Resolves
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/5137
Using the same approach as in
https://github.com/open-telemetry/opentelemetry-java-instrumentation/pull/6243
and as used by DataDog. Unlike in #6243 this pr does not attempt to
prevent leaking scopes into actors but rather instruments the actor to
reset context to get rid of the leaked scopes (DataDog does the same).
This commit is contained in:
Lauri Tulmin 2023-04-05 17:11:05 +03:00 committed by GitHub
parent 2f0819ae20
commit d87f40c9c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 267 additions and 130 deletions

View File

@ -12,6 +12,7 @@ import akka.dispatch.Envelope;
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
@ -52,6 +53,11 @@ public class AkkaActorCellInstrumentation implements TypeInstrumentation {
if (scope != null) {
scope.close();
}
// akka-http instrumentation can leak scopes
// reset the context to clear the leaked scopes
if (Java8BytecodeBridge.currentContext() != Java8BytecodeBridge.rootContext()) {
Java8BytecodeBridge.rootContext().makeCurrent();
}
}
}

View File

@ -34,10 +34,9 @@ dependencies {
library("com.typesafe.akka:akka-http_2.11:10.0.0")
library("com.typesafe.akka:akka-stream_2.11:2.4.14")
// these instrumentations are not needed for the tests to pass
// they are here to test for context leaks
testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent"))
testInstrumentation(project(":instrumentation:akka:akka-actor-fork-join-2.5:javaagent"))
testInstrumentation(project(":instrumentation:scala-fork-join-2.8:javaagent"))
latestDepTestLibrary("com.typesafe.akka:akka-http_2.13:+")
latestDepTestLibrary("com.typesafe.akka:akka-stream_2.13:+")
@ -48,6 +47,8 @@ tasks.withType<Test>().configureEach {
jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED")
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}

View File

@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Attributes;
import akka.stream.BidiShape;
import akka.stream.Inlet;
import akka.stream.Outlet;
import akka.stream.scaladsl.Flow;
import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.ArrayDeque;
import java.util.Deque;
public class AkkaFlowWrapper
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
private final Inlet<HttpRequest> requestIn = Inlet.create("otel.requestIn");
private final Outlet<HttpRequest> requestOut = Outlet.create("otel.requestOut");
private final Inlet<HttpResponse> responseIn = Inlet.create("otel.responseIn");
private final Outlet<HttpResponse> responseOut = Outlet.create("otel.responseOut");
private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
BidiShape.of(responseIn, responseOut, requestIn, requestOut);
public static Flow<HttpRequest, HttpResponse, ?> wrap(
Flow<HttpRequest, HttpResponse, ?> handler) {
return handler.join(new AkkaFlowWrapper());
}
@Override
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes attributes) {
return new TracingLogic();
}
private class TracingLogic extends GraphStageLogic {
private final Deque<TracingRequest> requests = new ArrayDeque<>();
public TracingLogic() {
super(shape);
// server pulls response, pass response from user code to server
setHandler(
responseOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(responseIn);
}
@Override
public void onDownstreamFinish() {
cancel(responseIn);
}
});
// user code pulls request, pass request from server to user code
setHandler(
requestOut,
new AbstractOutHandler() {
@Override
public void onPull() {
pull(requestIn);
}
@Override
public void onDownstreamFinish() {
// Invoked on errors. Don't complete this stage to allow error-capturing
cancel(requestIn);
}
});
// new request from server
setHandler(
requestIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpRequest request = grab(requestIn);
TracingRequest tracingRequest = TracingRequest.EMPTY;
Context parentContext = currentContext();
if (instrumenter().shouldStart(parentContext, request)) {
Context context = instrumenter().start(parentContext, request);
// scope opened here may leak, actor instrumentation will close it
Scope scope = context.makeCurrent();
tracingRequest = new TracingRequest(context, scope, request);
}
// event if span wasn't started we need to push TracingRequest to match response
// with request
requests.push(tracingRequest);
push(requestOut, request);
}
@Override
public void onUpstreamFinish() {
complete(requestOut);
}
@Override
public void onUpstreamFailure(Throwable exception) {
fail(requestOut, exception);
}
});
// response from user code
setHandler(
responseIn,
new AbstractInHandler() {
@Override
public void onPush() {
HttpResponse response = grab(responseIn);
TracingRequest tracingRequest = requests.poll();
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
// this may happen on a different thread from the one that opened the scope
// actor instrumentation will take care of the leaked scopes
tracingRequest.scope.close();
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
}
push(responseOut, response);
}
@Override
public void onUpstreamFailure(Throwable exception) {
TracingRequest tracingRequest;
while ((tracingRequest = requests.poll()) != null) {
if (tracingRequest == TracingRequest.EMPTY) {
continue;
}
tracingRequest.scope.close();
instrumenter()
.end(
tracingRequest.context, tracingRequest.request, errorResponse(), exception);
}
fail(responseOut, exception);
}
@Override
public void onUpstreamFinish() {
completeStage();
}
});
}
}
private static class TracingRequest {
static final TracingRequest EMPTY = new TracingRequest(null, null, null);
final Context context;
final Scope scope;
final HttpRequest request;
TracingRequest(Context context, Scope scope, HttpRequest request) {
this.context = context;
this.scope = scope;
this.request = request;
}
}
}

View File

@ -5,23 +5,12 @@
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
import static java.util.Collections.singletonList;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
@AutoService(InstrumentationModule.class)
public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
@ -33,73 +22,4 @@ public class AkkaHttpServerInstrumentationModule extends InstrumentationModule {
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpExtServerInstrumentation());
}
public static class SyncWrapper extends AbstractFunction1<HttpRequest, HttpResponse> {
private final Function1<HttpRequest, HttpResponse> userHandler;
public SyncWrapper(Function1<HttpRequest, HttpResponse> userHandler) {
this.userHandler = userHandler;
}
@Override
public HttpResponse apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
HttpResponse response = userHandler.apply(request);
instrumenter().end(context, request, response, null);
return response;
} catch (Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
throw t;
}
}
}
public static class AsyncWrapper extends AbstractFunction1<HttpRequest, Future<HttpResponse>> {
private final Function1<HttpRequest, Future<HttpResponse>> userHandler;
private final ExecutionContext executionContext;
public AsyncWrapper(
Function1<HttpRequest, Future<HttpResponse>> userHandler,
ExecutionContext executionContext) {
this.userHandler = userHandler;
this.executionContext = executionContext;
}
@Override
public Future<HttpResponse> apply(HttpRequest request) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return userHandler.apply(request);
}
Context context = instrumenter().start(parentContext, request);
try (Scope ignored = context.makeCurrent()) {
return userHandler
.apply(request)
.transform(
new AbstractFunction1<HttpResponse, HttpResponse>() {
@Override
public HttpResponse apply(HttpResponse response) {
instrumenter().end(context, request, response, null);
return response;
}
},
new AbstractFunction1<Throwable, Throwable>() {
@Override
public Throwable apply(Throwable t) {
instrumenter().end(context, request, errorResponse(), t);
return t;
}
},
executionContext);
} catch (Throwable t) {
instrumenter().end(context, request, null, t);
throw t;
}
}
}
}

View File

@ -10,14 +10,12 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.Materializer;
import akka.stream.scaladsl.Flow;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Function1;
import scala.concurrent.Future;
public class HttpExtServerInstrumentation implements TypeInstrumentation {
@Override
@ -27,42 +25,18 @@ public class HttpExtServerInstrumentation implements TypeInstrumentation {
@Override
public void transform(TypeTransformer transformer) {
// Instrumenting akka-streams bindAndHandle api was previously attempted.
// This proved difficult as there was no clean way to close the async scope
// in the graph logic after the user's request handler completes.
//
// Instead, we're instrumenting the bindAndHandle function helpers by
// wrapping the scala functions with our own handlers.
transformer.applyAdviceToMethod(
named("bindAndHandleSync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpSyncAdvice");
transformer.applyAdviceToMethod(
named("bindAndHandleAsync").and(takesArgument(0, named("scala.Function1"))),
this.getClass().getName() + "$AkkaHttpAsyncAdvice");
named("bindAndHandle").and(takesArgument(0, named("akka.stream.scaladsl.Flow"))),
this.getClass().getName() + "$AkkaBindAndHandleAdvice");
}
@SuppressWarnings("unused")
public static class AkkaHttpSyncAdvice {
public static class AkkaBindAndHandleAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, HttpResponse> handler) {
handler = new AkkaHttpServerInstrumentationModule.SyncWrapper(handler);
}
}
@SuppressWarnings("unused")
public static class AkkaHttpAsyncAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrapHandler(
@Advice.Argument(value = 0, readOnly = false)
Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(7) Materializer materializer) {
handler =
new AkkaHttpServerInstrumentationModule.AsyncWrapper(
handler, materializer.executionContext());
@Advice.Argument(value = 0, readOnly = false) Flow<HttpRequest, HttpResponse, ?> handler) {
handler = AkkaFlowWrapper.wrap(handler);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
import io.opentelemetry.instrumentation.testing.junit.http.{
HttpServerInstrumentationExtension,
HttpServerTestOptions
}
import org.junit.jupiter.api.extension.RegisterExtension
class AkkaHttpServerInstrumentationTest
extends AbstractHttpServerInstrumentationTest {
@RegisterExtension val extension: InstrumentationExtension =
HttpServerInstrumentationExtension.forAgent()
override protected def setupServer(): AnyRef = {
AkkaHttpTestWebServer.start(port)
null
}
override protected def stopServer(server: Object): Unit =
AkkaHttpTestWebServer.stop()
override protected def configure(
options: HttpServerTestOptions
): Unit = {
super.configure(options)
// exception doesn't propagate
options.setTestException(false)
}
}

View File

@ -12,11 +12,15 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.ExceptionHandler
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.testing.junit.http.{
AbstractHttpServerTest,
ServerEndpoint
}
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint._
import java.util.function.Supplier
import scala.concurrent.Await
// FIXME: This doesn't work because we don't support bindAndHandle.
object AkkaHttpTestWebServer {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
@ -29,21 +33,36 @@ object AkkaHttpTestWebServer {
)
}
val route = { // handleExceptions(exceptionHandler) {
path(SUCCESS.rawPath) {
complete(
HttpResponse(status = SUCCESS.getStatus).withEntity(SUCCESS.getBody)
)
} ~ path(QUERY_PARAM.rawPath) {
complete(
HttpResponse(status = QUERY_PARAM.getStatus).withEntity(SUCCESS.getBody)
)
} ~ path(REDIRECT.rawPath) {
redirect(Uri(REDIRECT.getBody), StatusCodes.Found)
} ~ path(ERROR.rawPath) {
complete(HttpResponse(status = ERROR.getStatus).withEntity(ERROR.getBody))
} ~ path(EXCEPTION.rawPath) {
failWith(new Exception(EXCEPTION.getBody))
val route = handleExceptions(exceptionHandler) {
extractUri { uri =>
val endpoint = ServerEndpoint.forPath(uri.path.toString())
complete {
AbstractHttpServerTest.controller(
endpoint,
new Supplier[HttpResponse] {
def get(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus)
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
case ERROR => resp.withEntity(endpoint.getBody)
case EXCEPTION => throw new Exception(endpoint.getBody)
case _ =>
HttpResponse(status = NOT_FOUND.getStatus)
.withEntity(NOT_FOUND.getBody)
}
}
}
)
}
}
}

View File

@ -74,6 +74,10 @@ tasks {
dependsOn(testing.suites)
}
}
withType<Test>().configureEach {
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
}
}
// play-test depends on websocket-client