From d749ac00917548e845e4d5760becf045e2b0b942 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 8 Aug 2023 11:00:10 +0300 Subject: [PATCH] Rework grpc cancelation propagation (#8957) --- .../grpc-1.6/javaagent/build.gradle.kts | 6 ++ .../grpc/v1_6/GrpcContextInstrumentation.java | 16 +++-- .../grpc/v1_6/GrpcSingletons.java | 12 +++- .../v1_6/internal/ContextStorageBridge.java | 59 +++++++++++++---- .../grpc/v1_6/AbstractGrpcTest.java | 65 +++++++++++++++++++ 5 files changed, 139 insertions(+), 19 deletions(-) diff --git a/instrumentation/grpc-1.6/javaagent/build.gradle.kts b/instrumentation/grpc-1.6/javaagent/build.gradle.kts index a7e5a2d54e..a25ad45d21 100644 --- a/instrumentation/grpc-1.6/javaagent/build.gradle.kts +++ b/instrumentation/grpc-1.6/javaagent/build.gradle.kts @@ -36,5 +36,11 @@ tasks { jvmArgs("-Dotel.javaagent.experimental.thread-propagation-debugger.enabled=false") jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.client.request=some-client-key") jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.server.request=some-server-key") + + // exclude our grpc library instrumentation, the ContextStorageOverride contained within it + // breaks the tests + classpath = classpath.filter { + !it.absolutePath.contains("opentelemetry-grpc-1.6") + } } } diff --git a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcContextInstrumentation.java b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcContextInstrumentation.java index aeca7598e3..78fbef2168 100644 --- a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcContextInstrumentation.java +++ b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcContextInstrumentation.java @@ -36,14 +36,20 @@ public class GrpcContextInstrumentation implements TypeInstrumentation { @SuppressWarnings("unused") public static class ContextBridgeAdvice { - @Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class) - public static Object onEnter() { - return null; + @Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class) + public static Context.Storage onEnter() { + return GrpcSingletons.getStorage(); } @Advice.OnMethodExit - public static void onExit(@Advice.Return(readOnly = false) Context.Storage storage) { - storage = GrpcSingletons.STORAGE; + public static void onExit( + @Advice.Return(readOnly = false) Context.Storage storage, + @Advice.Enter Context.Storage ourStorage) { + if (ourStorage != null) { + storage = ourStorage; + } else { + storage = GrpcSingletons.setStorage(storage); + } } } } diff --git a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java index 0f4e4edb5c..36709d9e7f 100644 --- a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java +++ b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java @@ -15,6 +15,7 @@ import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry; import io.opentelemetry.instrumentation.grpc.v1_6.internal.ContextStorageBridge; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; // Holds singleton references. public final class GrpcSingletons { @@ -23,7 +24,7 @@ public final class GrpcSingletons { public static final ServerInterceptor SERVER_INTERCEPTOR; - public static final Context.Storage STORAGE = new ContextStorageBridge(false); + private static final AtomicReference STORAGE_REFERENCE = new AtomicReference<>(); static { boolean experimentalSpanAttributes = @@ -48,5 +49,14 @@ public final class GrpcSingletons { SERVER_INTERCEPTOR = telemetry.newServerInterceptor(); } + public static Context.Storage getStorage() { + return STORAGE_REFERENCE.get(); + } + + public static Context.Storage setStorage(Context.Storage storage) { + STORAGE_REFERENCE.compareAndSet(null, new ContextStorageBridge(storage)); + return getStorage(); + } + private GrpcSingletons() {} } diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java index 6dbef26e17..5e82e74745 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java @@ -28,11 +28,24 @@ public final class ContextStorageBridge extends Context.Storage { private static final Context.Key OTEL_CONTEXT = Context.key("otel-context"); private static final Context.Key OTEL_SCOPE = Context.key("otel-scope"); + // context attached to original context store + private static final Context.Key ORIGINAL_CONTEXT = Context.key("original-context"); + // context that should be restored in original context store on detach + private static final Context.Key ORIGINAL_TO_RESTORE = + Context.key("original-to-restore"); private final boolean propagateGrpcDeadline; + // original context storage that would have been used when running without agent + private final Context.Storage originalStorage; public ContextStorageBridge(boolean propagateGrpcDeadline) { this.propagateGrpcDeadline = propagateGrpcDeadline; + this.originalStorage = null; + } + + public ContextStorageBridge(Context.Storage originalStorage) { + propagateGrpcDeadline = false; + this.originalStorage = originalStorage; } @Override @@ -45,7 +58,9 @@ public final class ContextStorageBridge extends Context.Storage { } if (current == toAttach) { - return current.withValue(OTEL_SCOPE, Scope.noop()); + Context result = current.withValue(OTEL_SCOPE, Scope.noop()); + result = attachOriginalContextStorage(result); + return result; } io.opentelemetry.context.Context base = OTEL_CONTEXT.get(toAttach); @@ -64,11 +79,28 @@ public final class ContextStorageBridge extends Context.Storage { } Scope scope = newOtelContext.makeCurrent(); - return current.withValue(OTEL_SCOPE, scope); + Context result = current.withValue(OTEL_SCOPE, scope); + result = attachOriginalContextStorage(result); + return result; + } + + private Context attachOriginalContextStorage(Context context) { + Context result = context; + if (originalStorage != null) { + Context originalToRestore = originalStorage.doAttach(result); + result = result.withValues(ORIGINAL_CONTEXT, result, ORIGINAL_TO_RESTORE, originalToRestore); + } + return result; } @Override public void detach(Context toDetach, Context toRestore) { + if (originalStorage != null) { + Context originalContext = ORIGINAL_CONTEXT.get(toRestore); + Context originalToRestore = ORIGINAL_TO_RESTORE.get(toRestore); + originalStorage.detach(originalContext, originalToRestore); + } + Scope scope = OTEL_SCOPE.get(toRestore); if (scope == null) { logger.log( @@ -93,17 +125,18 @@ public final class ContextStorageBridge extends Context.Storage { // create a new context referring to the current OTel context to reflect the current stack. // The previous context is unaffected and will continue to live in its own stack. - if (!propagateGrpcDeadline) { - // Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a - // deadline where it - // wasn't present before. Notably, this could happen with no user intention when using the - // javaagent which will - // add OpenTelemetry propagation automatically, and cause that code to fail with a deadline - // cancellation. While - // ideally we could propagate deadline as well as gRPC intended, we cannot have existing - // code fail because it - // added the javaagent and choose to fork here. - current = current.fork(); + if (!propagateGrpcDeadline && originalStorage != null) { + Context originalCurrent = originalStorage.current(); + // check whether grpc context would have propagated without otel context + if (originalCurrent == null || originalCurrent == Context.ROOT) { + // Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a + // deadline where it wasn't present before. Notably, this could happen with no user + // intention when using the javaagent which will add OpenTelemetry propagation + // automatically, and cause that code to fail with a deadline cancellation. While ideally + // we could propagate deadline as well as gRPC intended, we cannot have existing code fail + // because it added the javaagent and choose to fork here. + current = current.fork(); + } } return current.withValue(OTEL_CONTEXT, otelContext); diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java index 7891d0263d..060b3a0df1 100644 --- a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java @@ -59,6 +59,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -1469,6 +1470,70 @@ public abstract class AbstractGrpcTest { assertThat(error).hasValue(null); } + // Regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8923 + @Test + void cancelListenerCalled() throws Exception { + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch cancelLatch = new CountDownLatch(1); + AtomicBoolean cancelCalled = new AtomicBoolean(); + + Server server = + configureServer( + ServerBuilder.forPort(0) + .addService( + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request request, + StreamObserver responseObserver) { + startLatch.countDown(); + + io.grpc.Context context = io.grpc.Context.current(); + context.addListener( + context1 -> cancelCalled.set(true), MoreExecutors.directExecutor()); + try { + cancelLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + responseObserver.onNext( + Helloworld.Response.newBuilder() + .setMessage(request.getName()) + .build()); + responseObserver.onCompleted(); + } + })) + .build() + .start(); + ManagedChannel channel = createChannel(server); + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel); + ListenableFuture future = + client.sayHello(Helloworld.Request.newBuilder().setName("test").build()); + + startLatch.await(10, TimeUnit.SECONDS); + future.cancel(false); + cancelLatch.countDown(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(0)))); + + assertThat(cancelCalled.get()).isEqualTo(true); + } + @Test void setCapturedRequestMetadata() throws Exception { String metadataAttributePrefix = "rpc.grpc.request.metadata.";