From bb26c17733f8c8ef3f61ce79254c0a3111a82e4a Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 9 Oct 2020 11:52:20 +0900 Subject: [PATCH] =?UTF-8?q?Don't=20mount=20context=20in=20gRPC=20instrumen?= =?UTF-8?q?tation=20since=20gRPC=20automatically=20=E2=80=A6=20(#1343)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Don't mount context in gRPC instrumentation since gRPC automatically does it, better. * Small cleanup * Try different approach to keep out of bootstrap --- .../gradle/AutoInstrumentationPlugin.java | 71 ++++---- .../v1_5/client/TracingClientInterceptor.java | 18 +- .../v1_5/server/TracingServerInterceptor.java | 47 ++--- .../grpc/v1_5/AbstractGrpcTest.groovy | 162 ++++++++++++++++++ 4 files changed, 229 insertions(+), 69 deletions(-) diff --git a/buildSrc/src/main/java/io/opentelemetry/instrumentation/gradle/AutoInstrumentationPlugin.java b/buildSrc/src/main/java/io/opentelemetry/instrumentation/gradle/AutoInstrumentationPlugin.java index 59312ff29b..bde5cac378 100644 --- a/buildSrc/src/main/java/io/opentelemetry/instrumentation/gradle/AutoInstrumentationPlugin.java +++ b/buildSrc/src/main/java/io/opentelemetry/instrumentation/gradle/AutoInstrumentationPlugin.java @@ -41,15 +41,16 @@ import org.gradle.process.CommandLineArgumentProvider; public class AutoInstrumentationPlugin implements Plugin { /** - * An exact copy of {@code io.opentelemetry.javaagent.tooling.Constants#BOOTSTRAP_PACKAGE_PREFIXES}. We - * can't reference it directly since this file needs to be compiled before the other packages. + * An exact copy of {@code + * io.opentelemetry.javaagent.tooling.Constants#BOOTSTRAP_PACKAGE_PREFIXES}. We can't reference it + * directly since this file needs to be compiled before the other packages. */ public static final String[] BOOTSTRAP_PACKAGE_PREFIXES_COPY = { - "io.opentelemetry.javaagent.common.exec", - "io.opentelemetry.javaagent.slf4j", - "io.opentelemetry.javaagent.bootstrap", - "io.opentelemetry.javaagent.shaded", - "io.opentelemetry.instrumentation.auto.api", + "io.opentelemetry.javaagent.common.exec", + "io.opentelemetry.javaagent.slf4j", + "io.opentelemetry.javaagent.bootstrap", + "io.opentelemetry.javaagent.shaded", + "io.opentelemetry.instrumentation.auto.api", }; // Aditional classes we need only for tests and aren't shared with the agent business logic. @@ -57,36 +58,38 @@ public class AutoInstrumentationPlugin implements Plugin { static { String[] testBS = { - "io.opentelemetry.instrumentation.api", - "io.opentelemetry.OpenTelemetry", // OpenTelemetry API - "io.opentelemetry.common", // OpenTelemetry API - "io.opentelemetry.baggage", // OpenTelemetry API - "io.opentelemetry.context", // OpenTelemetry API (context prop) - "io.opentelemetry.internal", // OpenTelemetry API - "io.opentelemetry.metrics", // OpenTelemetry API - "io.opentelemetry.trace", // OpenTelemetry API - "io.grpc.Context", // OpenTelemetry API dependency - "io.grpc.Deadline", // OpenTelemetry API dependency - "io.grpc.PersistentHashArrayMappedTrie", // OpenTelemetry API dependency - "io.grpc.ThreadLocalContextStorage", // OpenTelemetry API dependency - "org.slf4j", - "ch.qos.logback", - // Tomcat's servlet classes must be on boostrap - // when running tomcat test - "javax.servlet.ServletContainerInitializer", - "javax.servlet.ServletContext" + "io.opentelemetry.instrumentation.api", + "io.opentelemetry.OpenTelemetry", // OpenTelemetry API + "io.opentelemetry.common", // OpenTelemetry API + "io.opentelemetry.baggage", // OpenTelemetry API + "io.opentelemetry.context", // OpenTelemetry API (context prop) + "io.opentelemetry.internal", // OpenTelemetry API + "io.opentelemetry.metrics", // OpenTelemetry API + "io.opentelemetry.trace", // OpenTelemetry API + "io.grpc.Context", // OpenTelemetry API dependency + "io.grpc.Deadline", // OpenTelemetry API dependency + "io.grpc.PersistentHashArrayMappedTrie", // OpenTelemetry API dependency + "io.grpc.ThreadLocalContextStorage", // OpenTelemetry API dependency + "org.slf4j", + "ch.qos.logback", + // Tomcat's servlet classes must be on boostrap + // when running tomcat test + "javax.servlet.ServletContainerInitializer", + "javax.servlet.ServletContext" }; TEST_BOOTSTRAP_PREFIXES = Arrays.copyOf( BOOTSTRAP_PACKAGE_PREFIXES_COPY, BOOTSTRAP_PACKAGE_PREFIXES_COPY.length + testBS.length); - System.arraycopy(testBS, 0, TEST_BOOTSTRAP_PREFIXES, BOOTSTRAP_PACKAGE_PREFIXES_COPY.length, - testBS.length); + System.arraycopy( + testBS, 0, TEST_BOOTSTRAP_PREFIXES, BOOTSTRAP_PACKAGE_PREFIXES_COPY.length, testBS.length); for (int i = 0; i < TEST_BOOTSTRAP_PREFIXES.length; i++) { TEST_BOOTSTRAP_PREFIXES[i] = TEST_BOOTSTRAP_PREFIXES[i].replace('.', '/'); } } + private static final String[] NOT_BOOTSTRAP_PREFIXES = {"io/grpc/Contexts"}; + @Override public void apply(Project project) { project.getPlugins().apply(JavaLibraryPlugin.class); @@ -144,8 +147,10 @@ public class AutoInstrumentationPlugin implements Plugin { }); task.dependsOn(bootstrapJar); - task.getJvmArgumentProviders().add(new InstrumentationTestArgs( - new File(project.getBuildDir(), "libs/" + bootstrapJarName))); + task.getJvmArgumentProviders() + .add( + new InstrumentationTestArgs( + new File(project.getBuildDir(), "libs/" + bootstrapJarName))); }); } @@ -163,11 +168,17 @@ public class AutoInstrumentationPlugin implements Plugin { @Override public Iterable asArguments() { - return Arrays.asList("-Xbootclasspath/a:" + bootstrapJar.getAbsolutePath(), "-Dnet.bytebuddy.raw=true"); + return Arrays.asList( + "-Xbootclasspath/a:" + bootstrapJar.getAbsolutePath(), "-Dnet.bytebuddy.raw=true"); } } private static boolean isBootstrapClass(String filePath) { + for (String notBootstrapName : NOT_BOOTSTRAP_PREFIXES) { + if (filePath.startsWith(notBootstrapName)) { + return false; + } + } for (String testBootstrapPrefix : TEST_BOOTSTRAP_PREFIXES) { if (filePath.startsWith(testBootstrapPrefix)) { return true; diff --git a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/client/TracingClientInterceptor.java b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/client/TracingClientInterceptor.java index a04bcf8b8b..aa029f48e8 100644 --- a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/client/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/client/TracingClientInterceptor.java @@ -98,9 +98,8 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void start(Listener responseListener, Metadata headers) { OpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, SETTER); - try (Scope ignored = withScopedContext(context)) { - super.start( - new TracingClientCallListener<>(responseListener, span, context, tracer), headers); + try { + super.start(new TracingClientCallListener<>(responseListener, span, tracer), headers); } catch (Throwable e) { tracer.endExceptionally(span, e); throw e; @@ -109,7 +108,7 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void sendMessage(ReqT message) { - try (Scope ignored = withScopedContext(context)) { + try { super.sendMessage(message); } catch (Throwable e) { tracer.endExceptionally(span, e); @@ -121,16 +120,13 @@ public class TracingClientInterceptor implements ClientInterceptor { static final class TracingClientCallListener extends ForwardingClientCallListener.SimpleForwardingClientCallListener { private final Span span; - private final Context context; private final GrpcClientTracer tracer; private final AtomicLong messageId = new AtomicLong(); - TracingClientCallListener( - Listener delegate, Span span, Context context, GrpcClientTracer tracer) { + TracingClientCallListener(Listener delegate, Span span, GrpcClientTracer tracer) { super(delegate); this.span = span; - this.context = context; this.tracer = tracer; } @@ -143,7 +139,7 @@ public class TracingClientInterceptor implements ClientInterceptor { SemanticAttributes.GRPC_MESSAGE_ID, messageId.incrementAndGet()); span.addEvent("message", attributes); - try (Scope ignored = withScopedContext(context)) { + try { delegate().onMessage(message); } catch (Throwable e) { tracer.addThrowable(span, e); @@ -152,7 +148,7 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void onClose(Status status, Metadata trailers) { - try (Scope ignored = withScopedContext(context)) { + try { delegate().onClose(status, trailers); } catch (Throwable e) { tracer.endExceptionally(span, e); @@ -163,7 +159,7 @@ public class TracingClientInterceptor implements ClientInterceptor { @Override public void onReady() { - try (Scope ignored = withScopedContext(context)) { + try { delegate().onReady(); } catch (Throwable e) { tracer.endExceptionally(span, e); diff --git a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/server/TracingServerInterceptor.java b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/server/TracingServerInterceptor.java index 2a44758368..680b089f83 100644 --- a/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/server/TracingServerInterceptor.java +++ b/instrumentation/grpc-1.5/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_5/server/TracingServerInterceptor.java @@ -5,8 +5,8 @@ package io.opentelemetry.instrumentation.grpc.v1_5.server; -import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; - +import io.grpc.Context; +import io.grpc.Contexts; import io.grpc.ForwardingServerCall; import io.grpc.ForwardingServerCallListener; import io.grpc.Grpc; @@ -17,10 +17,10 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; import io.opentelemetry.common.Attributes; -import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.grpc.v1_5.common.GrpcHelper; import io.opentelemetry.trace.Span; import io.opentelemetry.trace.Tracer; +import io.opentelemetry.trace.TracingContextUtils; import io.opentelemetry.trace.attributes.SemanticAttributes; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -62,25 +62,18 @@ public class TracingServerInterceptor implements ServerInterceptor { } GrpcHelper.prepareSpan(span, methodName); - ServerCall.Listener result; - try (Scope ignored = currentContextWith(span)) { + Context context = TracingContextUtils.withSpan(span, Context.current()); - try { - // Wrap the server call so that we can decorate the span - // with the resulting status - TracingServerCall tracingServerCall = - new TracingServerCall<>(call, span, tracer); - - // call other interceptors - result = next.startCall(tracingServerCall, headers); - } catch (Throwable e) { - tracer.endExceptionally(span, e); - throw e; - } + try { + return new TracingServerCallListener<>( + Contexts.interceptCall( + context, new TracingServerCall<>(call, span, tracer), headers, next), + span, + tracer); + } catch (Throwable e) { + tracer.endExceptionally(span, e); + throw e; } - - // This ensures the server implementation can see the span in scope - return new TracingServerCallListener<>(result, span, tracer); } static final class TracingServerCall @@ -97,7 +90,7 @@ public class TracingServerInterceptor implements ServerInterceptor { @Override public void close(Status status, Metadata trailers) { tracer.setStatus(span, status); - try (Scope ignored = currentContextWith(span)) { + try { delegate().close(status, trailers); } catch (Throwable e) { tracer.endExceptionally(span, e); @@ -128,14 +121,12 @@ public class TracingServerInterceptor implements ServerInterceptor { SemanticAttributes.GRPC_MESSAGE_ID, messageId.incrementAndGet()); span.addEvent("message", attributes); - try (Scope ignored = currentContextWith(span)) { - delegate().onMessage(message); - } + delegate().onMessage(message); } @Override public void onHalfClose() { - try (Scope ignored = currentContextWith(span)) { + try { delegate().onHalfClose(); } catch (Throwable e) { tracer.endExceptionally(span, e); @@ -145,7 +136,7 @@ public class TracingServerInterceptor implements ServerInterceptor { @Override public void onCancel() { - try (Scope ignored = currentContextWith(span)) { + try { delegate().onCancel(); span.setAttribute("canceled", true); } catch (Throwable e) { @@ -157,7 +148,7 @@ public class TracingServerInterceptor implements ServerInterceptor { @Override public void onComplete() { - try (Scope ignored = currentContextWith(span)) { + try { delegate().onComplete(); } catch (Throwable e) { tracer.endExceptionally(span, e); @@ -168,7 +159,7 @@ public class TracingServerInterceptor implements ServerInterceptor { @Override public void onReady() { - try (Scope ignored = currentContextWith(span)) { + try { delegate().onReady(); } catch (Throwable e) { tracer.endExceptionally(span, e); diff --git a/instrumentation/grpc-1.5/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_5/AbstractGrpcTest.groovy b/instrumentation/grpc-1.5/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_5/AbstractGrpcTest.groovy index 20b2481b26..26e7cd9cdd 100644 --- a/instrumentation/grpc-1.5/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_5/AbstractGrpcTest.groovy +++ b/instrumentation/grpc-1.5/testing/src/main/groovy/io/opentelemetry/instrumentation/grpc/v1_5/AbstractGrpcTest.groovy @@ -13,18 +13,31 @@ import static io.opentelemetry.trace.Span.Kind.SERVER import example.GreeterGrpc import example.Helloworld import io.grpc.BindableService +import io.grpc.CallOptions +import io.grpc.Channel +import io.grpc.ClientCall +import io.grpc.ClientInterceptor +import io.grpc.Context import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder +import io.grpc.Metadata +import io.grpc.MethodDescriptor import io.grpc.Server import io.grpc.ServerBuilder +import io.grpc.ServerCall +import io.grpc.ServerCallHandler +import io.grpc.ServerInterceptor import io.grpc.Status import io.grpc.StatusRuntimeException import io.grpc.stub.StreamObserver import io.opentelemetry.auto.test.InstrumentationSpecification import io.opentelemetry.auto.test.utils.PortUtils import io.opentelemetry.trace.StatusCanonicalCode +import io.opentelemetry.trace.TracingContextUtils import io.opentelemetry.trace.attributes.SemanticAttributes +import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import spock.lang.Unroll @Unroll @@ -286,4 +299,153 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification { "Status - description" | Status.PERMISSION_DENIED.withDescription("some description") "StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description") } + + def "test user context preserved"() { + setup: + Context.Key key = Context.key("cat") + BindableService greeter = new GreeterGrpc.GreeterImplBase() { + @Override + void sayHello( + final Helloworld.Request req, final StreamObserver responseObserver) { + if (key.get() != "meow") { + responseObserver.onError(new AssertionError((Object) "context not preserved")) + return + } + if (!TracingContextUtils.getSpan(Context.current()).getContext().isValid()) { + responseObserver.onError(new AssertionError((Object) "span not attached")) + return + } + final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build() + responseObserver.onNext(reply) + responseObserver.onCompleted() + } + } + def port = PortUtils.randomOpenPort() + Server server + server = configureServer(ServerBuilder.forPort(port) + .addService(greeter)) + .intercept(new ServerInterceptor() { + @Override + ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + def ctx = Context.current().withValue(key, "meow") + def oldCtx = ctx.attach() + try { + return next.startCall(call, headers) + } finally { + ctx.detach(oldCtx) + } + } + }) + .build().start() + ManagedChannelBuilder channelBuilder + channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port)) + .intercept(new ClientInterceptor() { + @Override + ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + def ctx = Context.current().withValue(key, "meow") + def oldCtx = ctx.attach() + try { + return next.newCall(method, callOptions) + } finally { + ctx.detach(oldCtx) + } + } + }) + + // Depending on the version of gRPC usePlainText may or may not take an argument. + try { + channelBuilder.usePlaintext() + } catch (MissingMethodException e) { + channelBuilder.usePlaintext(true) + } + ManagedChannel channel = channelBuilder.build() + def client = GreeterGrpc.newStub(channel) + + when: + AtomicReference response = new AtomicReference<>() + AtomicReference error = new AtomicReference<>() + CountDownLatch latch = new CountDownLatch(1) + runUnderTrace("parent") { + client.sayHello( + Helloworld.Request.newBuilder().setName("test").build(), + new StreamObserver() { + @Override + void onNext(Helloworld.Response r) { + if (key.get() != "meow") { + error.set(new AssertionError((Object) "context not preserved")) + return + } + if (!TracingContextUtils.getSpan(Context.current()).getContext().isValid()) { + error.set(new AssertionError((Object) "span not attached")) + return + } + response.set(r) + } + + @Override + void onError(Throwable throwable) { + error.set(throwable) + } + + @Override + void onCompleted() { + latch.countDown() + } + }) + } + + latch.await(10, TimeUnit.SECONDS) + + then: + error.get() == null + response.get().message == "Hello test" + + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "parent") + span(1) { + name "example.Greeter/SayHello" + kind CLIENT + childOf span(0) + errored false + event(0) { + eventName "message" + attributes { + "message.type" "SENT" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key()}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key()}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key()}" "SayHello" + } + } + span(2) { + name "example.Greeter/SayHello" + kind SERVER + childOf span(1) + errored false + event(0) { + eventName "message" + attributes { + "message.type" "RECEIVED" + "message.id" 1 + } + } + attributes { + "${SemanticAttributes.RPC_SYSTEM.key()}" "grpc" + "${SemanticAttributes.RPC_SERVICE.key()}" "example.Greeter" + "${SemanticAttributes.RPC_METHOD.key()}" "SayHello" + "${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1" + "${SemanticAttributes.NET_PEER_PORT.key()}" Long + } + } + } + } + + cleanup: + channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS) + server?.shutdownNow()?.awaitTermination() + } }