From 1d9c23bfb3d52a8a0fca81a3da4fe3c389198f87 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Sat, 12 Mar 2022 04:06:26 +0900 Subject: [PATCH] Do not propagate gRPC deadline when propagating OTel context via javaagent. (#5543) * Add test for early return in gRPC pattern. * Do not propagate gRPC deadline when propagating OTel context via javaagent. --- .../grpc/v1_6/GrpcSingletons.java | 2 +- .../grpc/override/ContextStorageOverride.java | 2 +- .../v1_6/internal/ContextStorageBridge.java | 20 +++++ .../grpc/v1_6/AbstractGrpcTest.java | 83 +++++++++++++++++++ 4 files changed, 105 insertions(+), 2 deletions(-) diff --git a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java index 6095e60450..188613d6a7 100644 --- a/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java +++ b/instrumentation/grpc-1.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/grpc/v1_6/GrpcSingletons.java @@ -20,7 +20,7 @@ public final class GrpcSingletons { public static final ServerInterceptor SERVER_INTERCEPTOR; - public static final Context.Storage STORAGE = new ContextStorageBridge(); + public static final Context.Storage STORAGE = new ContextStorageBridge(false); static { boolean experimentalSpanAttributes = diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/grpc/override/ContextStorageOverride.java b/instrumentation/grpc-1.6/library/src/main/java/io/grpc/override/ContextStorageOverride.java index ac20ff0251..9a2b1af92f 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/grpc/override/ContextStorageOverride.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/grpc/override/ContextStorageOverride.java @@ -14,7 +14,7 @@ import io.opentelemetry.instrumentation.grpc.v1_6.internal.ContextStorageBridge; */ public final class ContextStorageOverride extends Context.Storage { - private static final Context.Storage delegate = new ContextStorageBridge(); + private static final Context.Storage delegate = new ContextStorageBridge(true); @Override public Context doAttach(Context toAttach) { diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java index c2c80ea64e..e0657bcee0 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/internal/ContextStorageBridge.java @@ -28,6 +28,12 @@ public final class ContextStorageBridge extends Context.Storage { Context.key("otel-context"); private static final Context.Key OTEL_SCOPE = Context.key("otel-scope"); + private final boolean propagateGrpcDeadline; + + public ContextStorageBridge(boolean propagateGrpcDeadline) { + this.propagateGrpcDeadline = propagateGrpcDeadline; + } + @Override public Context doAttach(Context toAttach) { io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); @@ -87,6 +93,20 @@ public final class ContextStorageBridge extends Context.Storage { // This context has already been previously attached and associated with an OTel context. Just // create a new context referring to the current OTel context to reflect the current stack. // The previous context is unaffected and will continue to live in its own stack. + + if (!propagateGrpcDeadline) { + // Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a + // deadline where it + // wasn't present before. Notably, this could happen with no user intention when using the + // javaagent which will + // add OpenTelemetry propagation automatically, and cause that code to fail with a deadline + // cancellation. While + // ideally we could propagate deadline as well as gRPC intended, we cannot have existing + // code fail because it + // added the javaagent and choose to fork here. + current = current.fork(); + } + return current.withValue(OTEL_CONTEXT, otelContext); } return current; diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java index 7d5d151059..62c8a64ad7 100644 --- a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java @@ -49,6 +49,8 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -1370,6 +1372,87 @@ public abstract class AbstractGrpcTest { SemanticAttributes.MESSAGE_ID, 2L)))))); } + // Regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4169 + @Test + void clientCallAfterServerCompleted() throws Exception { + Server backend = + configureServer( + ServerBuilder.forPort(0) + .addService( + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request request, + StreamObserver responseObserver) { + responseObserver.onNext( + Helloworld.Response.newBuilder() + .setMessage(request.getName()) + .build()); + responseObserver.onCompleted(); + } + })) + .build() + .start(); + ManagedChannel backendChannel = createChannel(backend); + closer.add(() -> backendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> backend.shutdownNow().awaitTermination()); + GreeterGrpc.GreeterBlockingStub backendStub = GreeterGrpc.newBlockingStub(backendChannel); + + // This executor does not propagate context without the javaagent available. + ExecutorService executor = Executors.newFixedThreadPool(1); + closer.add(executor::shutdownNow); + + CountDownLatch clientCallDone = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + + Server frontend = + configureServer( + ServerBuilder.forPort(0) + .addService( + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request request, + StreamObserver responseObserver) { + responseObserver.onNext( + Helloworld.Response.newBuilder() + .setMessage(request.getName()) + .build()); + responseObserver.onCompleted(); + + executor.execute( + () -> { + try { + backendStub.sayHello(request); + } catch (Throwable t) { + error.set(t); + } + clientCallDone.countDown(); + }); + } + })) + .build() + .start(); + ManagedChannel frontendChannel = createChannel(frontend); + closer.add(() -> frontendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> frontend.shutdownNow().awaitTermination()); + + GreeterGrpc.GreeterBlockingStub frontendStub = GreeterGrpc.newBlockingStub(frontendChannel); + frontendStub.sayHello(Helloworld.Request.newBuilder().setName("test").build()); + + // We don't assert on telemetry - the intention of this test is to verify that adding + // instrumentation, either as + // library or javaagent, does not cause exceptions in the business logic. The produced telemetry + // will be different + // for the two cases due to lack of context propagation in the library case, but that isn't what + // we're testing here. + + clientCallDone.await(10, TimeUnit.SECONDS); + + assertThat(error).hasValue(null); + } + private ManagedChannel createChannel(Server server) throws Exception { ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));