diff --git a/instrumentation/grpc-1.5/javaagent/grpc-1.5-javaagent.gradle b/instrumentation/grpc-1.5/javaagent/grpc-1.5-javaagent.gradle index 603cb3f081..fde9048a46 100644 --- a/instrumentation/grpc-1.5/javaagent/grpc-1.5-javaagent.gradle +++ b/instrumentation/grpc-1.5/javaagent/grpc-1.5-javaagent.gradle @@ -36,4 +36,10 @@ dependencies { testLibrary group: 'io.grpc', name: 'grpc-stub', version: grpcVersion testImplementation project(':instrumentation:grpc-1.5:testing') -} \ No newline at end of file +} + +test { + // The agent context debug mechanism isn't compatible with the bridge approach which may add a + // gRPC context to the root. + jvmArgs "-Dotel.javaagent.experimental.thread-propagation-debugger.enabled=false" +} diff --git a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcClientBuilderBuildInstrumentation.java b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcClientBuilderBuildInstrumentation.java index ebf36b224e..e396dc6014 100644 --- a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcClientBuilderBuildInstrumentation.java +++ b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcClientBuilderBuildInstrumentation.java @@ -45,7 +45,7 @@ public class GrpcClientBuilderBuildInstrumentation implements TypeInstrumentatio @Advice.OnMethodEnter(suppress = Throwable.class) public static void addInterceptor( @Advice.FieldValue("interceptors") List interceptors) { - interceptors.add(0, GrpcInterceptors.CLIENT_INTERCEPTOR); + interceptors.add(0, GrpcSingletons.CLIENT_INTERCEPTOR); } } } diff --git a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcContextInstrumentation.java b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcContextInstrumentation.java new file mode 100644 index 0000000000..d35dfcf7e3 --- /dev/null +++ b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcContextInstrumentation.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.grpc.v1_5; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.grpc.Context; +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.Collections; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class GrpcContextInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("io.grpc.Context"); + } + + @Override + public Map, String> transformers() { + return Collections.singletonMap( + isMethod() + .and(isStatic()) + .and(named("storage")) + .and(returns(named("io.grpc.Context$Storage"))), + GrpcContextInstrumentation.class.getName() + "$ContextBridgeAdvice"); + } + + public static class ContextBridgeAdvice { + @Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class) + public static Object onEnter() { + return null; + } + + @Advice.OnMethodExit + public static void onExit(@Advice.Return(readOnly = false) Context.Storage storage) { + storage = GrpcSingletons.STORAGE; + } + } +} diff --git a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInstrumentationModule.java b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInstrumentationModule.java index 50e016e5d7..67aaeaca60 100644 --- a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInstrumentationModule.java +++ b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInstrumentationModule.java @@ -21,6 +21,8 @@ public class GrpcInstrumentationModule extends InstrumentationModule { @Override public List typeInstrumentations() { return asList( - new GrpcClientBuilderBuildInstrumentation(), new GrpcServerBuilderInstrumentation()); + new GrpcClientBuilderBuildInstrumentation(), + new GrpcContextInstrumentation(), + new GrpcServerBuilderInstrumentation()); } } diff --git a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcServerBuilderInstrumentation.java b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcServerBuilderInstrumentation.java index b04047dcbd..9a3624142c 100644 --- a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcServerBuilderInstrumentation.java +++ b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcServerBuilderInstrumentation.java @@ -47,7 +47,7 @@ public class GrpcServerBuilderInstrumentation implements TypeInstrumentation { public static void onEnter(@Advice.This ServerBuilder serverBuilder) { int callDepth = CallDepthThreadLocalMap.incrementCallDepth(ServerBuilder.class); if (callDepth == 0) { - serverBuilder.intercept(GrpcInterceptors.SERVER_INTERCEPTOR); + serverBuilder.intercept(GrpcSingletons.SERVER_INTERCEPTOR); } } diff --git a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInterceptors.java b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcSingletons.java similarity index 78% rename from instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInterceptors.java rename to instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcSingletons.java index c6b460f8c4..8ea7b4f6b8 100644 --- a/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcInterceptors.java +++ b/instrumentation/grpc-1.5/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_5/GrpcSingletons.java @@ -6,13 +6,15 @@ package io.opentelemetry.javaagent.instrumentation.grpc.v1_5; import io.grpc.ClientInterceptor; +import io.grpc.Context; import io.grpc.ServerInterceptor; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.grpc.v1_5.GrpcTracing; +import io.opentelemetry.instrumentation.grpc.v1_5.internal.ContextStorageBridge; -// Holds singleton references to tracers. -public final class GrpcInterceptors { +// Holds singleton references. +public final class GrpcSingletons { private static final GrpcTracing TRACING = GrpcTracing.newBuilder(GlobalOpenTelemetry.get()) .setCaptureExperimentalSpanAttributes( @@ -24,4 +26,6 @@ public final class GrpcInterceptors { public static final ClientInterceptor CLIENT_INTERCEPTOR = TRACING.newClientInterceptor(); public static final ServerInterceptor SERVER_INTERCEPTOR = TRACING.newServerInterceptor(); + + public static final Context.Storage STORAGE = new ContextStorageBridge(); } diff --git a/instrumentation/grpc-1.5/library/grpc-1.5-library.gradle b/instrumentation/grpc-1.5/library/grpc-1.5-library.gradle index 6d5810f251..6af6d05247 100644 --- a/instrumentation/grpc-1.5/library/grpc-1.5-library.gradle +++ b/instrumentation/grpc-1.5/library/grpc-1.5-library.gradle @@ -9,5 +9,6 @@ dependencies { testLibrary group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion testLibrary group: 'io.grpc', name: 'grpc-stub', version: grpcVersion + testImplementation deps.assertj testImplementation project(':instrumentation:grpc-1.5:testing') -} \ No newline at end of file +} diff --git a/instrumentation/grpc-1.5/library/src/main/java/io/grpc/override/ContextStorageOverride.java b/instrumentation/grpc-1.5/library/src/main/java/io/grpc/override/ContextStorageOverride.java new file mode 100644 index 0000000000..450d3011e2 --- /dev/null +++ b/instrumentation/grpc-1.5/library/src/main/java/io/grpc/override/ContextStorageOverride.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.grpc.override; + +import io.grpc.Context; +import io.opentelemetry.instrumentation.grpc.v1_5.internal.ContextStorageBridge; + +/** + * Override class for gRPC to pick up this class to replace the default {@link Context.Storage} with + * an OpenTelemetry bridge. + */ +public final class ContextStorageOverride extends Context.Storage { + + private static final Context.Storage delegate = new ContextStorageBridge(); + + @Override + public void attach(Context toAttach) { + delegate.attach(toAttach); + } + + @Override + public void detach(Context toDetach, Context toRestore) { + delegate.detach(toDetach, toRestore); + } + + @Override + public Context current() { + return delegate.current(); + } +} diff --git a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/TracingServerInterceptor.java b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/TracingServerInterceptor.java index 995869338a..3de4daf6f9 100644 --- a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/TracingServerInterceptor.java +++ b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/TracingServerInterceptor.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.grpc.v1_5; +import io.grpc.Contexts; import io.grpc.ForwardingServerCall; import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; @@ -54,7 +55,9 @@ final class TracingServerInterceptor implements ServerInterceptor { try (Scope ignored = context.makeCurrent()) { return new TracingServerCallListener<>( - next.startCall(new TracingServerCall<>(call, context), headers), context); + Contexts.interceptCall( + io.grpc.Context.current(), new TracingServerCall<>(call, context), headers, next), + context); } catch (Throwable e) { tracer.endExceptionally(context, e); throw e; @@ -73,7 +76,7 @@ final class TracingServerInterceptor implements ServerInterceptor { @Override public void close(Status status, Metadata trailers) { tracer.setStatus(context, status); - try (Scope ignored = context.makeCurrent()) { + try { delegate().close(status, trailers); } catch (Throwable e) { tracer.endExceptionally(context, e); @@ -103,14 +106,12 @@ final class TracingServerInterceptor implements ServerInterceptor { GrpcHelper.MESSAGE_ID, messageId.incrementAndGet()); Span.fromContext(context).addEvent("message", attributes); - try (Scope ignored = context.makeCurrent()) { - delegate().onMessage(message); - } + delegate().onMessage(message); } @Override public void onHalfClose() { - try (Scope ignored = context.makeCurrent()) { + try { delegate().onHalfClose(); } catch (Throwable e) { tracer.endExceptionally(context, e); @@ -120,7 +121,7 @@ final class TracingServerInterceptor implements ServerInterceptor { @Override public void onCancel() { - try (Scope ignored = context.makeCurrent()) { + try { delegate().onCancel(); if (captureExperimentalSpanAttributes) { Span.fromContext(context).setAttribute("grpc.canceled", true); @@ -134,7 +135,7 @@ final class TracingServerInterceptor implements ServerInterceptor { @Override public void onComplete() { - try (Scope ignored = context.makeCurrent()) { + try { delegate().onComplete(); } catch (Throwable e) { tracer.endExceptionally(context, e); @@ -145,7 +146,7 @@ final class TracingServerInterceptor implements ServerInterceptor { @Override public void onReady() { - try (Scope ignored = context.makeCurrent()) { + try { delegate().onReady(); } catch (Throwable e) { tracer.endExceptionally(context, e); diff --git a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/internal/ContextStorageBridge.java b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/internal/ContextStorageBridge.java new file mode 100644 index 0000000000..7c3c45e968 --- /dev/null +++ b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/internal/ContextStorageBridge.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_5.internal; + +import io.grpc.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.Scope; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * {@link Context.Storage} override which uses OpenTelemetry context as the backing store. Both gRPC + * and OpenTelemetry contexts refer to each other to ensure that both OTel context propagation + * mechanisms and gRPC context propagation mechanisms can be used interchangably. + */ +public final class ContextStorageBridge extends Context.Storage { + + private static final Logger logger = Logger.getLogger(ContextStorageBridge.class.getName()); + + private static final ContextKey GRPC_CONTEXT = ContextKey.named("grpc-context"); + private static final Context.Key OTEL_CONTEXT = + Context.key("otel-context"); + + // Because the extension point is void, there is no way to return information about the backing + // OpenTelemetry context when attaching gRPC context. So the only option is to have this + // side-channel to keep track of scopes. Because the same context can be attached to multiple + // threads, we must use a ThreadLocal here - on the bright side it means the map doesn't have to + // be concurrent. This will add an additional threadlocal lookup when attaching / detaching gRPC + // context, but not when accessing the current. In many applications, this means a small + // difference + // since those operations are rare, but in highly reactive applications where the overhead of + // ThreadLocal was already a problem, this makes it worse. + private static final ThreadLocal>> contextScopes = + ThreadLocal.withInitial(WeakHashMap::new); + + @Override + public void attach(Context toAttach) { + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); + Context current = otelContext.get(GRPC_CONTEXT); + + if (current == toAttach) { + contextScopes + .get() + .computeIfAbsent(toAttach, unused -> new ArrayDeque<>()) + .addLast(Scope.noop()); + return; + } + + io.opentelemetry.context.Context base = OTEL_CONTEXT.get(toAttach); + final io.opentelemetry.context.Context newOtelContext; + if (base != null) { + // gRPC context which has an OTel context associated with it via a call to + // ContextStorageOverride.current(). Using it as the base allows it to be propagated together + // with the gRPC context. + newOtelContext = base.with(GRPC_CONTEXT, toAttach); + } else { + // gRPC context without an OTel context associated with it. This is only possible when + // attaching a context directly created by Context.ROOT, e.g., Context.ROOT.with(...) which + // is not common. We go ahead and assume the gRPC context can be reset while using the current + // OTel context. + newOtelContext = io.opentelemetry.context.Context.current().with(GRPC_CONTEXT, toAttach); + } + + Scope scope = newOtelContext.makeCurrent(); + contextScopes.get().computeIfAbsent(toAttach, unused -> new ArrayDeque<>()).addLast(scope); + } + + @Override + public void detach(Context toDetach, Context toRestore) { + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); + Context current = otelContext.get(GRPC_CONTEXT); + if (current != toDetach) { + // Log a severe message instead of throwing an exception as the context to attach is assumed + // to be the correct one and the unbalanced state represents a coding mistake in a lower + // layer in the stack that cannot be recovered from here. + logger.log( + Level.SEVERE, + "Context was not attached when detaching", + new Throwable().fillInStackTrace()); + } + Map> contextStacks = contextScopes.get(); + Deque stack = contextStacks.get(toDetach); + Scope scope = stack.pollLast(); + if (scope == null) { + logger.log( + Level.SEVERE, + "Detaching context which was not attached.", + new Throwable().fillInStackTrace()); + } else { + scope.close(); + } + if (stack.isEmpty()) { + contextStacks.remove(toDetach); + } + } + + @Override + public Context current() { + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); + Context current = otelContext.get(GRPC_CONTEXT); + if (current == null) { + return Context.ROOT.withValue(OTEL_CONTEXT, otelContext); + } + // Store the current OTel context in the gRPC context so that gRPC context propagation + // mechanisms will also propagate the OTel context. + io.opentelemetry.context.Context previousOtelContext = OTEL_CONTEXT.get(current); + if (previousOtelContext != otelContext) { + // This context has already been previously attached and associated with an OTel context. Just + // create a new context referring to the current OTel context to reflect the current stack. + // The previous context is unaffected and will continue to live in its own stack. + return current.withValue(OTEL_CONTEXT, otelContext); + } + return current; + } +} diff --git a/instrumentation/grpc-1.5/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_5/ContextBridgeTest.java b/instrumentation/grpc-1.5/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_5/ContextBridgeTest.java new file mode 100644 index 0000000000..0ca1127e65 --- /dev/null +++ b/instrumentation/grpc-1.5/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_5/ContextBridgeTest.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.grpc.v1_5; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.Scope; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class ContextBridgeTest { + private static final ContextKey ANIMAL = ContextKey.named("animal"); + + private static final io.grpc.Context.Key FOOD = io.grpc.Context.key("food"); + private static final io.grpc.Context.Key COUNTRY = io.grpc.Context.key("country"); + + private static ExecutorService otherThread; + + @BeforeAll + static void setUp() { + otherThread = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void tearDown() { + otherThread.shutdown(); + } + + @Test + void grpcOtelMix() { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + assertThat(COUNTRY.get()).isNull(); + io.grpc.Context root = grpcContext.attach(); + try { + assertThat(COUNTRY.get()).isEqualTo("japan"); + try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) { + assertThat(Context.current().get(ANIMAL)).isEqualTo("cat"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + + io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese"); + assertThat(FOOD.get()).isNull(); + io.grpc.Context toRestore = context2.attach(); + try { + assertThat(FOOD.get()).isEqualTo("cheese"); + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().get(ANIMAL)).isEqualTo("cat"); + } finally { + context2.detach(toRestore); + } + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void grpcWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().get(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().get(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(io.grpc.Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } + + @Test + void otelWrap() throws Exception { + io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan"); + io.grpc.Context root = grpcContext.attach(); + try { + try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) { + assertThat(COUNTRY.get()).isEqualTo("japan"); + assertThat(Context.current().get(ANIMAL)).isEqualTo("cat"); + + AtomicReference grpcValue = new AtomicReference<>(); + AtomicReference otelValue = new AtomicReference<>(); + Runnable runnable = + () -> { + grpcValue.set(COUNTRY.get()); + otelValue.set(Context.current().get(ANIMAL)); + }; + otherThread.submit(runnable).get(); + assertThat(grpcValue).hasValue(null); + assertThat(otelValue).hasValue(null); + + otherThread.submit(Context.current().wrap(runnable)).get(); + assertThat(grpcValue).hasValue("japan"); + assertThat(otelValue).hasValue("cat"); + } + } finally { + grpcContext.detach(root); + } + } +}