Don't mount context in gRPC instrumentation since gRPC automatically … (#1343)

* Don't mount context in gRPC instrumentation since gRPC automatically does it, better.

* Small cleanup

* Try different approach to keep out of bootstrap
This commit is contained in:
Anuraag Agrawal 2020-10-09 11:52:20 +09:00 committed by GitHub
parent ce4414e9dd
commit bb26c17733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 229 additions and 69 deletions

View File

@ -41,15 +41,16 @@ import org.gradle.process.CommandLineArgumentProvider;
public class AutoInstrumentationPlugin implements Plugin<Project> {
/**
* An exact copy of {@code io.opentelemetry.javaagent.tooling.Constants#BOOTSTRAP_PACKAGE_PREFIXES}. We
* can't reference it directly since this file needs to be compiled before the other packages.
* An exact copy of {@code
* io.opentelemetry.javaagent.tooling.Constants#BOOTSTRAP_PACKAGE_PREFIXES}. We can't reference it
* directly since this file needs to be compiled before the other packages.
*/
public static final String[] BOOTSTRAP_PACKAGE_PREFIXES_COPY = {
"io.opentelemetry.javaagent.common.exec",
"io.opentelemetry.javaagent.slf4j",
"io.opentelemetry.javaagent.bootstrap",
"io.opentelemetry.javaagent.shaded",
"io.opentelemetry.instrumentation.auto.api",
"io.opentelemetry.javaagent.common.exec",
"io.opentelemetry.javaagent.slf4j",
"io.opentelemetry.javaagent.bootstrap",
"io.opentelemetry.javaagent.shaded",
"io.opentelemetry.instrumentation.auto.api",
};
// Aditional classes we need only for tests and aren't shared with the agent business logic.
@ -57,36 +58,38 @@ public class AutoInstrumentationPlugin implements Plugin<Project> {
static {
String[] testBS = {
"io.opentelemetry.instrumentation.api",
"io.opentelemetry.OpenTelemetry", // OpenTelemetry API
"io.opentelemetry.common", // OpenTelemetry API
"io.opentelemetry.baggage", // OpenTelemetry API
"io.opentelemetry.context", // OpenTelemetry API (context prop)
"io.opentelemetry.internal", // OpenTelemetry API
"io.opentelemetry.metrics", // OpenTelemetry API
"io.opentelemetry.trace", // OpenTelemetry API
"io.grpc.Context", // OpenTelemetry API dependency
"io.grpc.Deadline", // OpenTelemetry API dependency
"io.grpc.PersistentHashArrayMappedTrie", // OpenTelemetry API dependency
"io.grpc.ThreadLocalContextStorage", // OpenTelemetry API dependency
"org.slf4j",
"ch.qos.logback",
// Tomcat's servlet classes must be on boostrap
// when running tomcat test
"javax.servlet.ServletContainerInitializer",
"javax.servlet.ServletContext"
"io.opentelemetry.instrumentation.api",
"io.opentelemetry.OpenTelemetry", // OpenTelemetry API
"io.opentelemetry.common", // OpenTelemetry API
"io.opentelemetry.baggage", // OpenTelemetry API
"io.opentelemetry.context", // OpenTelemetry API (context prop)
"io.opentelemetry.internal", // OpenTelemetry API
"io.opentelemetry.metrics", // OpenTelemetry API
"io.opentelemetry.trace", // OpenTelemetry API
"io.grpc.Context", // OpenTelemetry API dependency
"io.grpc.Deadline", // OpenTelemetry API dependency
"io.grpc.PersistentHashArrayMappedTrie", // OpenTelemetry API dependency
"io.grpc.ThreadLocalContextStorage", // OpenTelemetry API dependency
"org.slf4j",
"ch.qos.logback",
// Tomcat's servlet classes must be on boostrap
// when running tomcat test
"javax.servlet.ServletContainerInitializer",
"javax.servlet.ServletContext"
};
TEST_BOOTSTRAP_PREFIXES =
Arrays.copyOf(
BOOTSTRAP_PACKAGE_PREFIXES_COPY,
BOOTSTRAP_PACKAGE_PREFIXES_COPY.length + testBS.length);
System.arraycopy(testBS, 0, TEST_BOOTSTRAP_PREFIXES, BOOTSTRAP_PACKAGE_PREFIXES_COPY.length,
testBS.length);
System.arraycopy(
testBS, 0, TEST_BOOTSTRAP_PREFIXES, BOOTSTRAP_PACKAGE_PREFIXES_COPY.length, testBS.length);
for (int i = 0; i < TEST_BOOTSTRAP_PREFIXES.length; i++) {
TEST_BOOTSTRAP_PREFIXES[i] = TEST_BOOTSTRAP_PREFIXES[i].replace('.', '/');
}
}
private static final String[] NOT_BOOTSTRAP_PREFIXES = {"io/grpc/Contexts"};
@Override
public void apply(Project project) {
project.getPlugins().apply(JavaLibraryPlugin.class);
@ -144,8 +147,10 @@ public class AutoInstrumentationPlugin implements Plugin<Project> {
});
task.dependsOn(bootstrapJar);
task.getJvmArgumentProviders().add(new InstrumentationTestArgs(
new File(project.getBuildDir(), "libs/" + bootstrapJarName)));
task.getJvmArgumentProviders()
.add(
new InstrumentationTestArgs(
new File(project.getBuildDir(), "libs/" + bootstrapJarName)));
});
}
@ -163,11 +168,17 @@ public class AutoInstrumentationPlugin implements Plugin<Project> {
@Override
public Iterable<String> asArguments() {
return Arrays.asList("-Xbootclasspath/a:" + bootstrapJar.getAbsolutePath(), "-Dnet.bytebuddy.raw=true");
return Arrays.asList(
"-Xbootclasspath/a:" + bootstrapJar.getAbsolutePath(), "-Dnet.bytebuddy.raw=true");
}
}
private static boolean isBootstrapClass(String filePath) {
for (String notBootstrapName : NOT_BOOTSTRAP_PREFIXES) {
if (filePath.startsWith(notBootstrapName)) {
return false;
}
}
for (String testBootstrapPrefix : TEST_BOOTSTRAP_PREFIXES) {
if (filePath.startsWith(testBootstrapPrefix)) {
return true;

View File

@ -98,9 +98,8 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
OpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, SETTER);
try (Scope ignored = withScopedContext(context)) {
super.start(
new TracingClientCallListener<>(responseListener, span, context, tracer), headers);
try {
super.start(new TracingClientCallListener<>(responseListener, span, tracer), headers);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
throw e;
@ -109,7 +108,7 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void sendMessage(ReqT message) {
try (Scope ignored = withScopedContext(context)) {
try {
super.sendMessage(message);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
@ -121,16 +120,13 @@ public class TracingClientInterceptor implements ClientInterceptor {
static final class TracingClientCallListener<RespT>
extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
private final Span span;
private final Context context;
private final GrpcClientTracer tracer;
private final AtomicLong messageId = new AtomicLong();
TracingClientCallListener(
Listener<RespT> delegate, Span span, Context context, GrpcClientTracer tracer) {
TracingClientCallListener(Listener<RespT> delegate, Span span, GrpcClientTracer tracer) {
super(delegate);
this.span = span;
this.context = context;
this.tracer = tracer;
}
@ -143,7 +139,7 @@ public class TracingClientInterceptor implements ClientInterceptor {
SemanticAttributes.GRPC_MESSAGE_ID,
messageId.incrementAndGet());
span.addEvent("message", attributes);
try (Scope ignored = withScopedContext(context)) {
try {
delegate().onMessage(message);
} catch (Throwable e) {
tracer.addThrowable(span, e);
@ -152,7 +148,7 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void onClose(Status status, Metadata trailers) {
try (Scope ignored = withScopedContext(context)) {
try {
delegate().onClose(status, trailers);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
@ -163,7 +159,7 @@ public class TracingClientInterceptor implements ClientInterceptor {
@Override
public void onReady() {
try (Scope ignored = withScopedContext(context)) {
try {
delegate().onReady();
} catch (Throwable e) {
tracer.endExceptionally(span, e);

View File

@ -5,8 +5,8 @@
package io.opentelemetry.instrumentation.grpc.v1_5.server;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Grpc;
@ -17,10 +17,10 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.grpc.v1_5.common.GrpcHelper;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import io.opentelemetry.trace.TracingContextUtils;
import io.opentelemetry.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -62,25 +62,18 @@ public class TracingServerInterceptor implements ServerInterceptor {
}
GrpcHelper.prepareSpan(span, methodName);
ServerCall.Listener<ReqT> result;
try (Scope ignored = currentContextWith(span)) {
Context context = TracingContextUtils.withSpan(span, Context.current());
try {
// Wrap the server call so that we can decorate the span
// with the resulting status
TracingServerCall<ReqT, RespT> tracingServerCall =
new TracingServerCall<>(call, span, tracer);
// call other interceptors
result = next.startCall(tracingServerCall, headers);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
throw e;
}
try {
return new TracingServerCallListener<>(
Contexts.interceptCall(
context, new TracingServerCall<>(call, span, tracer), headers, next),
span,
tracer);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
throw e;
}
// This ensures the server implementation can see the span in scope
return new TracingServerCallListener<>(result, span, tracer);
}
static final class TracingServerCall<ReqT, RespT>
@ -97,7 +90,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void close(Status status, Metadata trailers) {
tracer.setStatus(span, status);
try (Scope ignored = currentContextWith(span)) {
try {
delegate().close(status, trailers);
} catch (Throwable e) {
tracer.endExceptionally(span, e);
@ -128,14 +121,12 @@ public class TracingServerInterceptor implements ServerInterceptor {
SemanticAttributes.GRPC_MESSAGE_ID,
messageId.incrementAndGet());
span.addEvent("message", attributes);
try (Scope ignored = currentContextWith(span)) {
delegate().onMessage(message);
}
delegate().onMessage(message);
}
@Override
public void onHalfClose() {
try (Scope ignored = currentContextWith(span)) {
try {
delegate().onHalfClose();
} catch (Throwable e) {
tracer.endExceptionally(span, e);
@ -145,7 +136,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onCancel() {
try (Scope ignored = currentContextWith(span)) {
try {
delegate().onCancel();
span.setAttribute("canceled", true);
} catch (Throwable e) {
@ -157,7 +148,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onComplete() {
try (Scope ignored = currentContextWith(span)) {
try {
delegate().onComplete();
} catch (Throwable e) {
tracer.endExceptionally(span, e);
@ -168,7 +159,7 @@ public class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onReady() {
try (Scope ignored = currentContextWith(span)) {
try {
delegate().onReady();
} catch (Throwable e) {
tracer.endExceptionally(span, e);

View File

@ -13,18 +13,31 @@ import static io.opentelemetry.trace.Span.Kind.SERVER
import example.GreeterGrpc
import example.Helloworld
import io.grpc.BindableService
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.Context
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.Server
import io.grpc.ServerBuilder
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver
import io.opentelemetry.auto.test.InstrumentationSpecification
import io.opentelemetry.auto.test.utils.PortUtils
import io.opentelemetry.trace.StatusCanonicalCode
import io.opentelemetry.trace.TracingContextUtils
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import spock.lang.Unroll
@Unroll
@ -286,4 +299,153 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
"Status - description" | Status.PERMISSION_DENIED.withDescription("some description")
"StatusRuntime - description" | Status.UNIMPLEMENTED.withDescription("some description")
}
def "test user context preserved"() {
setup:
Context.Key<String> key = Context.key("cat")
BindableService greeter = new GreeterGrpc.GreeterImplBase() {
@Override
void sayHello(
final Helloworld.Request req, final StreamObserver<Helloworld.Response> responseObserver) {
if (key.get() != "meow") {
responseObserver.onError(new AssertionError((Object) "context not preserved"))
return
}
if (!TracingContextUtils.getSpan(Context.current()).getContext().isValid()) {
responseObserver.onError(new AssertionError((Object) "span not attached"))
return
}
final Helloworld.Response reply = Helloworld.Response.newBuilder().setMessage("Hello $req.name").build()
responseObserver.onNext(reply)
responseObserver.onCompleted()
}
}
def port = PortUtils.randomOpenPort()
Server server
server = configureServer(ServerBuilder.forPort(port)
.addService(greeter))
.intercept(new ServerInterceptor() {
@Override
<ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
def ctx = Context.current().withValue(key, "meow")
def oldCtx = ctx.attach()
try {
return next.startCall(call, headers)
} finally {
ctx.detach(oldCtx)
}
}
})
.build().start()
ManagedChannelBuilder channelBuilder
channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
.intercept(new ClientInterceptor() {
@Override
<ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
def ctx = Context.current().withValue(key, "meow")
def oldCtx = ctx.attach()
try {
return next.newCall(method, callOptions)
} finally {
ctx.detach(oldCtx)
}
}
})
// Depending on the version of gRPC usePlainText may or may not take an argument.
try {
channelBuilder.usePlaintext()
} catch (MissingMethodException e) {
channelBuilder.usePlaintext(true)
}
ManagedChannel channel = channelBuilder.build()
def client = GreeterGrpc.newStub(channel)
when:
AtomicReference<Helloworld.Response> response = new AtomicReference<>()
AtomicReference<Throwable> error = new AtomicReference<>()
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
client.sayHello(
Helloworld.Request.newBuilder().setName("test").build(),
new StreamObserver<Helloworld.Response>() {
@Override
void onNext(Helloworld.Response r) {
if (key.get() != "meow") {
error.set(new AssertionError((Object) "context not preserved"))
return
}
if (!TracingContextUtils.getSpan(Context.current()).getContext().isValid()) {
error.set(new AssertionError((Object) "span not attached"))
return
}
response.set(r)
}
@Override
void onError(Throwable throwable) {
error.set(throwable)
}
@Override
void onCompleted() {
latch.countDown()
}
})
}
latch.await(10, TimeUnit.SECONDS)
then:
error.get() == null
response.get().message == "Hello test"
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent")
span(1) {
name "example.Greeter/SayHello"
kind CLIENT
childOf span(0)
errored false
event(0) {
eventName "message"
attributes {
"message.type" "SENT"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key()}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key()}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key()}" "SayHello"
}
}
span(2) {
name "example.Greeter/SayHello"
kind SERVER
childOf span(1)
errored false
event(0) {
eventName "message"
attributes {
"message.type" "RECEIVED"
"message.id" 1
}
}
attributes {
"${SemanticAttributes.RPC_SYSTEM.key()}" "grpc"
"${SemanticAttributes.RPC_SERVICE.key()}" "example.Greeter"
"${SemanticAttributes.RPC_METHOD.key()}" "SayHello"
"${SemanticAttributes.NET_PEER_IP.key()}" "127.0.0.1"
"${SemanticAttributes.NET_PEER_PORT.key()}" Long
}
}
}
}
cleanup:
channel?.shutdownNow()?.awaitTermination(10, TimeUnit.SECONDS)
server?.shutdownNow()?.awaitTermination()
}
}