Move Akka and Armeria virtual fields to helpers (#13604)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Jonas Kunz 2025-04-01 14:35:08 +02:00 committed by GitHub
parent 81f3795a77
commit 18155581b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 130 additions and 51 deletions

View File

@ -11,8 +11,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.dispatch.Envelope;
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
@ -42,9 +40,8 @@ public class AkkaActorCellInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope enter(@Advice.Argument(0) Envelope envelope) {
VirtualField<Envelope, PropagatedContext> virtualField =
VirtualField.find(Envelope.class, PropagatedContext.class);
return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, envelope);
return TaskAdviceHelper.makePropagatedContextCurrent(
VirtualFields.ENVELOPE_PROPAGATED_CONTEXT, envelope);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -60,9 +57,8 @@ public class AkkaActorCellInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope enter(@Advice.Argument(0) SystemMessage systemMessage) {
VirtualField<SystemMessage, PropagatedContext> virtualField =
VirtualField.find(SystemMessage.class, PropagatedContext.class);
return TaskAdviceHelper.makePropagatedContextCurrent(virtualField, systemMessage);
return TaskAdviceHelper.makePropagatedContextCurrent(
VirtualFields.SYSTEM_MESSAGE_PROPAGATED_CONTEXT, systemMessage);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -12,7 +12,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
@ -49,9 +48,8 @@ public class AkkaDefaultSystemMessageQueueInstrumentation implements TypeInstrum
public static PropagatedContext enter(@Advice.Argument(1) SystemMessage systemMessage) {
Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, systemMessage)) {
VirtualField<SystemMessage, PropagatedContext> virtualField =
VirtualField.find(SystemMessage.class, PropagatedContext.class);
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, systemMessage);
return ExecutorAdviceHelper.attachContextToTask(
context, VirtualFields.SYSTEM_MESSAGE_PROPAGATED_CONTEXT, systemMessage);
}
return null;
}
@ -61,10 +59,11 @@ public class AkkaDefaultSystemMessageQueueInstrumentation implements TypeInstrum
@Advice.Argument(1) SystemMessage systemMessage,
@Advice.Enter PropagatedContext propagatedContext,
@Advice.Thrown Throwable throwable) {
VirtualField<SystemMessage, PropagatedContext> virtualField =
VirtualField.find(SystemMessage.class, PropagatedContext.class);
ExecutorAdviceHelper.cleanUpAfterSubmit(
propagatedContext, throwable, virtualField, systemMessage);
propagatedContext,
throwable,
VirtualFields.SYSTEM_MESSAGE_PROPAGATED_CONTEXT,
systemMessage);
}
}
}

View File

@ -10,7 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.dispatch.Envelope;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
@ -43,9 +42,8 @@ public class AkkaDispatcherInstrumentation implements TypeInstrumentation {
public static PropagatedContext enterDispatch(@Advice.Argument(1) Envelope envelope) {
Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, envelope.message())) {
VirtualField<Envelope, PropagatedContext> virtualField =
VirtualField.find(Envelope.class, PropagatedContext.class);
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, envelope);
return ExecutorAdviceHelper.attachContextToTask(
context, VirtualFields.ENVELOPE_PROPAGATED_CONTEXT, envelope);
}
return null;
}
@ -55,9 +53,8 @@ public class AkkaDispatcherInstrumentation implements TypeInstrumentation {
@Advice.Argument(1) Envelope envelope,
@Advice.Enter PropagatedContext propagatedContext,
@Advice.Thrown Throwable throwable) {
VirtualField<Envelope, PropagatedContext> virtualField =
VirtualField.find(Envelope.class, PropagatedContext.class);
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable, virtualField, envelope);
ExecutorAdviceHelper.cleanUpAfterSubmit(
propagatedContext, throwable, VirtualFields.ENVELOPE_PROPAGATED_CONTEXT, envelope);
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkaactor;
import akka.dispatch.Envelope;
import akka.dispatch.sysmsg.SystemMessage;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
public class VirtualFields {
private VirtualFields() {}
public static final VirtualField<Envelope, PropagatedContext> ENVELOPE_PROPAGATED_CONTEXT =
VirtualField.find(Envelope.class, PropagatedContext.class);
public static final VirtualField<SystemMessage, PropagatedContext>
SYSTEM_MESSAGE_PROPAGATED_CONTEXT =
VirtualField.find(SystemMessage.class, PropagatedContext.class);
}

View File

@ -10,7 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.dispatch.forkjoin.ForkJoinTask;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.bootstrap.executors.ExecutorAdviceHelper;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
@ -51,9 +50,8 @@ public class AkkaForkJoinPoolInstrumentation implements TypeInstrumentation {
public static PropagatedContext enterJobSubmit(@Advice.Argument(0) ForkJoinTask<?> task) {
Context context = Java8BytecodeBridge.currentContext();
if (ExecutorAdviceHelper.shouldPropagateContext(context, task)) {
VirtualField<ForkJoinTask<?>, PropagatedContext> virtualField =
VirtualField.find(ForkJoinTask.class, PropagatedContext.class);
return ExecutorAdviceHelper.attachContextToTask(context, virtualField, task);
return ExecutorAdviceHelper.attachContextToTask(
context, VirtualFields.FORK_JOIN_TASK_PROPAGATED_CONTEXT, task);
}
return null;
}
@ -63,9 +61,8 @@ public class AkkaForkJoinPoolInstrumentation implements TypeInstrumentation {
@Advice.Argument(0) ForkJoinTask<?> task,
@Advice.Enter PropagatedContext propagatedContext,
@Advice.Thrown Throwable throwable) {
VirtualField<ForkJoinTask<?>, PropagatedContext> virtualField =
VirtualField.find(ForkJoinTask.class, PropagatedContext.class);
ExecutorAdviceHelper.cleanUpAfterSubmit(propagatedContext, throwable, virtualField, task);
ExecutorAdviceHelper.cleanUpAfterSubmit(
propagatedContext, throwable, VirtualFields.FORK_JOIN_TASK_PROPAGATED_CONTEXT, task);
}
}
}

View File

@ -15,8 +15,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import akka.dispatch.forkjoin.ForkJoinPool;
import akka.dispatch.forkjoin.ForkJoinTask;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import io.opentelemetry.javaagent.bootstrap.executors.TaskAdviceHelper;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
@ -62,14 +60,13 @@ public class AkkaForkJoinTaskInstrumentation implements TypeInstrumentation {
*/
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope enter(@Advice.This ForkJoinTask<?> thiz) {
VirtualField<ForkJoinTask<?>, PropagatedContext> virtualField =
VirtualField.find(ForkJoinTask.class, PropagatedContext.class);
Scope scope = TaskAdviceHelper.makePropagatedContextCurrent(virtualField, thiz);
Scope scope =
TaskAdviceHelper.makePropagatedContextCurrent(
VirtualFields.FORK_JOIN_TASK_PROPAGATED_CONTEXT, thiz);
if (thiz instanceof Runnable) {
VirtualField<Runnable, PropagatedContext> runnableVirtualField =
VirtualField.find(Runnable.class, PropagatedContext.class);
Scope newScope =
TaskAdviceHelper.makePropagatedContextCurrent(runnableVirtualField, (Runnable) thiz);
TaskAdviceHelper.makePropagatedContextCurrent(
VirtualFields.RUNNABLE_PROPAGATED_CONTEXT, (Runnable) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();
@ -79,10 +76,9 @@ public class AkkaForkJoinTaskInstrumentation implements TypeInstrumentation {
}
}
if (thiz instanceof Callable) {
VirtualField<Callable<?>, PropagatedContext> callableVirtualField =
VirtualField.find(Callable.class, PropagatedContext.class);
Scope newScope =
TaskAdviceHelper.makePropagatedContextCurrent(callableVirtualField, (Callable<?>) thiz);
TaskAdviceHelper.makePropagatedContextCurrent(
VirtualFields.CALLABLE_PROPAGATED_CONTEXT, (Callable<?>) thiz);
if (null != newScope) {
if (null != scope) {
newScope.close();

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkaactor;
import akka.dispatch.forkjoin.ForkJoinTask;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.bootstrap.executors.PropagatedContext;
import java.util.concurrent.Callable;
public class VirtualFields {
private VirtualFields() {}
public static final VirtualField<ForkJoinTask<?>, PropagatedContext>
FORK_JOIN_TASK_PROPAGATED_CONTEXT =
VirtualField.find(ForkJoinTask.class, PropagatedContext.class);
public static final VirtualField<Runnable, PropagatedContext> RUNNABLE_PROPAGATED_CONTEXT =
VirtualField.find(Runnable.class, PropagatedContext.class);
public static final VirtualField<Callable<?>, PropagatedContext> CALLABLE_PROPAGATED_CONTEXT =
VirtualField.find(Callable.class, PropagatedContext.class);
}

View File

@ -11,7 +11,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.http.scaladsl.model.Uri;
import akka.http.scaladsl.server.PathMatcher;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
@ -41,7 +40,7 @@ public class PathMatcherInstrumentation implements TypeInstrumentation {
@Advice.Argument(0) Uri.Path prefix, @Advice.Return PathMatcher<?> result) {
// store the path being matched inside a VirtualField on the given matcher, so it can be used
// for constructing the route
VirtualField.find(PathMatcher.class, String.class).set(result, prefix.toString());
PathMatcherUtil.setMatched(result, prefix.toString());
}
}
}

View File

@ -12,7 +12,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import akka.http.scaladsl.model.Uri;
import akka.http.scaladsl.server.PathMatcher;
import akka.http.scaladsl.server.PathMatchers;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
@ -48,7 +47,7 @@ public class PathMatcherStaticInstrumentation implements TypeInstrumentation {
}
// if present use the matched path that was remembered in PathMatcherInstrumentation,
// otherwise just use a *
String prefix = VirtualField.find(PathMatcher.class, String.class).get(pathMatcher);
String prefix = PathMatcherUtil.getMatched(pathMatcher);
if (prefix == null) {
if (PathMatchers.Slash$.class == pathMatcher.getClass()) {
prefix = "/";

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.akkahttp.server.route;
import akka.http.scaladsl.server.PathMatcher;
import io.opentelemetry.instrumentation.api.util.VirtualField;
public class PathMatcherUtil {
private static final VirtualField<PathMatcher<?>, String> PATH_MATCHER_ROUTE =
VirtualField.find(PathMatcher.class, String.class);
public static void setMatched(PathMatcher<?> matcher, String route) {
PATH_MATCHER_ROUTE.set(matcher, route);
}
public static String getMatched(PathMatcher<?> matcher) {
return PATH_MATCHER_ROUTE.get(matcher);
}
private PathMatcherUtil() {}
}

View File

@ -10,7 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.grpc.ServerCall;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.grpc.v1_6.GrpcAuthorityStorage;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
@ -43,7 +43,7 @@ public class ArmeriaServerCallInstrumentation implements TypeInstrumentation {
// ArmeriaServerCall does not implement getAuthority. We will store the value for authority
// header as virtual field, this field is read in grpc instrumentation in
// TracingServerInterceptor
VirtualField.find(ServerCall.class, String.class).set(serverCall, authority);
GrpcAuthorityStorage.setAuthority(serverCall, authority);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_6;
import io.grpc.ServerCall;
import io.opentelemetry.instrumentation.api.util.VirtualField;
/**
* In case a {@link ServerCall} implementation does not implement {@link ServerCall#getAuthority()}
* like armeria, this utility class should be used to provide the authority instead
*/
public class GrpcAuthorityStorage {
private static final VirtualField<ServerCall<?, ?>, String> AUTHORITY_FIELD =
VirtualField.find(ServerCall.class, String.class);
private GrpcAuthorityStorage() {}
public static void setAuthority(ServerCall<?, ?> call, String authority) {
AUTHORITY_FIELD.set(call, authority);
}
static String getAuthority(ServerCall<?, ?> call) {
return AUTHORITY_FIELD.get(call);
}
}

View File

@ -20,7 +20,6 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
final class TracingServerInterceptor implements ServerInterceptor {
@ -46,9 +45,6 @@ final class TracingServerInterceptor implements ServerInterceptor {
private static final AtomicLongFieldUpdater<TracingServerCall> RECEIVED_MESSAGE_ID_UPDATER =
AtomicLongFieldUpdater.newUpdater(TracingServerCall.class, "receivedMessageId");
private static final VirtualField<ServerCall<?, ?>, String> AUTHORITY_FIELD =
VirtualField.find(ServerCall.class, String.class);
private final Instrumenter<GrpcRequest, Status> instrumenter;
private final boolean captureExperimentalSpanAttributes;
private final boolean emitMessageEvents;
@ -72,7 +68,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
// Armeria grpc server call does not implement getAuthority(). In
// ArmeriaServerCallInstrumentation we store the value for the authority header in a virtual
// field.
authority = AUTHORITY_FIELD.get(call);
authority = GrpcAuthorityStorage.getAuthority(call);
}
GrpcRequest request =
new GrpcRequest(