Add gRPC context bridge (#2634)

* Add gRPC context bridge

* Finish

* Drift

* dependency hell

* Override

* current
This commit is contained in:
Anuraag Agrawal 2021-04-14 13:50:43 +09:00 committed by GitHub
parent 078603caf5
commit e87564ef12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 356 additions and 16 deletions

View File

@ -36,4 +36,10 @@ dependencies {
testLibrary group: 'io.grpc', name: 'grpc-stub', version: grpcVersion
testImplementation project(':instrumentation:grpc-1.5:testing')
}
}
test {
// The agent context debug mechanism isn't compatible with the bridge approach which may add a
// gRPC context to the root.
jvmArgs "-Dotel.javaagent.experimental.thread-propagation-debugger.enabled=false"
}

View File

@ -45,7 +45,7 @@ public class GrpcClientBuilderBuildInstrumentation implements TypeInstrumentatio
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void addInterceptor(
@Advice.FieldValue("interceptors") List<ClientInterceptor> interceptors) {
interceptors.add(0, GrpcInterceptors.CLIENT_INTERCEPTOR);
interceptors.add(0, GrpcSingletons.CLIENT_INTERCEPTOR);
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.grpc.v1_5;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.grpc.Context;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class GrpcContextInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("io.grpc.Context");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return Collections.singletonMap(
isMethod()
.and(isStatic())
.and(named("storage"))
.and(returns(named("io.grpc.Context$Storage"))),
GrpcContextInstrumentation.class.getName() + "$ContextBridgeAdvice");
}
public static class ContextBridgeAdvice {
@Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class)
public static Object onEnter() {
return null;
}
@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) Context.Storage storage) {
storage = GrpcSingletons.STORAGE;
}
}
}

View File

@ -21,6 +21,8 @@ public class GrpcInstrumentationModule extends InstrumentationModule {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new GrpcClientBuilderBuildInstrumentation(), new GrpcServerBuilderInstrumentation());
new GrpcClientBuilderBuildInstrumentation(),
new GrpcContextInstrumentation(),
new GrpcServerBuilderInstrumentation());
}
}

View File

@ -47,7 +47,7 @@ public class GrpcServerBuilderInstrumentation implements TypeInstrumentation {
public static void onEnter(@Advice.This ServerBuilder<?> serverBuilder) {
int callDepth = CallDepthThreadLocalMap.incrementCallDepth(ServerBuilder.class);
if (callDepth == 0) {
serverBuilder.intercept(GrpcInterceptors.SERVER_INTERCEPTOR);
serverBuilder.intercept(GrpcSingletons.SERVER_INTERCEPTOR);
}
}

View File

@ -6,13 +6,15 @@
package io.opentelemetry.javaagent.instrumentation.grpc.v1_5;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ServerInterceptor;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.grpc.v1_5.GrpcTracing;
import io.opentelemetry.instrumentation.grpc.v1_5.internal.ContextStorageBridge;
// Holds singleton references to tracers.
public final class GrpcInterceptors {
// Holds singleton references.
public final class GrpcSingletons {
private static final GrpcTracing TRACING =
GrpcTracing.newBuilder(GlobalOpenTelemetry.get())
.setCaptureExperimentalSpanAttributes(
@ -24,4 +26,6 @@ public final class GrpcInterceptors {
public static final ClientInterceptor CLIENT_INTERCEPTOR = TRACING.newClientInterceptor();
public static final ServerInterceptor SERVER_INTERCEPTOR = TRACING.newServerInterceptor();
public static final Context.Storage STORAGE = new ContextStorageBridge();
}

View File

@ -9,5 +9,6 @@ dependencies {
testLibrary group: 'io.grpc', name: 'grpc-protobuf', version: grpcVersion
testLibrary group: 'io.grpc', name: 'grpc-stub', version: grpcVersion
testImplementation deps.assertj
testImplementation project(':instrumentation:grpc-1.5:testing')
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.grpc.override;
import io.grpc.Context;
import io.opentelemetry.instrumentation.grpc.v1_5.internal.ContextStorageBridge;
/**
* Override class for gRPC to pick up this class to replace the default {@link Context.Storage} with
* an OpenTelemetry bridge.
*/
public final class ContextStorageOverride extends Context.Storage {
private static final Context.Storage delegate = new ContextStorageBridge();
@Override
public void attach(Context toAttach) {
delegate.attach(toAttach);
}
@Override
public void detach(Context toDetach, Context toRestore) {
delegate.detach(toDetach, toRestore);
}
@Override
public Context current() {
return delegate.current();
}
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.instrumentation.grpc.v1_5;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Grpc;
@ -54,7 +55,9 @@ final class TracingServerInterceptor implements ServerInterceptor {
try (Scope ignored = context.makeCurrent()) {
return new TracingServerCallListener<>(
next.startCall(new TracingServerCall<>(call, context), headers), context);
Contexts.interceptCall(
io.grpc.Context.current(), new TracingServerCall<>(call, context), headers, next),
context);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
throw e;
@ -73,7 +76,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
@Override
public void close(Status status, Metadata trailers) {
tracer.setStatus(context, status);
try (Scope ignored = context.makeCurrent()) {
try {
delegate().close(status, trailers);
} catch (Throwable e) {
tracer.endExceptionally(context, e);
@ -103,14 +106,12 @@ final class TracingServerInterceptor implements ServerInterceptor {
GrpcHelper.MESSAGE_ID,
messageId.incrementAndGet());
Span.fromContext(context).addEvent("message", attributes);
try (Scope ignored = context.makeCurrent()) {
delegate().onMessage(message);
}
delegate().onMessage(message);
}
@Override
public void onHalfClose() {
try (Scope ignored = context.makeCurrent()) {
try {
delegate().onHalfClose();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
@ -120,7 +121,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onCancel() {
try (Scope ignored = context.makeCurrent()) {
try {
delegate().onCancel();
if (captureExperimentalSpanAttributes) {
Span.fromContext(context).setAttribute("grpc.canceled", true);
@ -134,7 +135,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onComplete() {
try (Scope ignored = context.makeCurrent()) {
try {
delegate().onComplete();
} catch (Throwable e) {
tracer.endExceptionally(context, e);
@ -145,7 +146,7 @@ final class TracingServerInterceptor implements ServerInterceptor {
@Override
public void onReady() {
try (Scope ignored = context.makeCurrent()) {
try {
delegate().onReady();
} catch (Throwable e) {
tracer.endExceptionally(context, e);

View File

@ -0,0 +1,122 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_5.internal;
import io.grpc.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* {@link Context.Storage} override which uses OpenTelemetry context as the backing store. Both gRPC
* and OpenTelemetry contexts refer to each other to ensure that both OTel context propagation
* mechanisms and gRPC context propagation mechanisms can be used interchangably.
*/
public final class ContextStorageBridge extends Context.Storage {
private static final Logger logger = Logger.getLogger(ContextStorageBridge.class.getName());
private static final ContextKey<Context> GRPC_CONTEXT = ContextKey.named("grpc-context");
private static final Context.Key<io.opentelemetry.context.Context> OTEL_CONTEXT =
Context.key("otel-context");
// Because the extension point is void, there is no way to return information about the backing
// OpenTelemetry context when attaching gRPC context. So the only option is to have this
// side-channel to keep track of scopes. Because the same context can be attached to multiple
// threads, we must use a ThreadLocal here - on the bright side it means the map doesn't have to
// be concurrent. This will add an additional threadlocal lookup when attaching / detaching gRPC
// context, but not when accessing the current. In many applications, this means a small
// difference
// since those operations are rare, but in highly reactive applications where the overhead of
// ThreadLocal was already a problem, this makes it worse.
private static final ThreadLocal<WeakHashMap<Context, Deque<Scope>>> contextScopes =
ThreadLocal.withInitial(WeakHashMap::new);
@Override
public void attach(Context toAttach) {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Context current = otelContext.get(GRPC_CONTEXT);
if (current == toAttach) {
contextScopes
.get()
.computeIfAbsent(toAttach, unused -> new ArrayDeque<>())
.addLast(Scope.noop());
return;
}
io.opentelemetry.context.Context base = OTEL_CONTEXT.get(toAttach);
final io.opentelemetry.context.Context newOtelContext;
if (base != null) {
// gRPC context which has an OTel context associated with it via a call to
// ContextStorageOverride.current(). Using it as the base allows it to be propagated together
// with the gRPC context.
newOtelContext = base.with(GRPC_CONTEXT, toAttach);
} else {
// gRPC context without an OTel context associated with it. This is only possible when
// attaching a context directly created by Context.ROOT, e.g., Context.ROOT.with(...) which
// is not common. We go ahead and assume the gRPC context can be reset while using the current
// OTel context.
newOtelContext = io.opentelemetry.context.Context.current().with(GRPC_CONTEXT, toAttach);
}
Scope scope = newOtelContext.makeCurrent();
contextScopes.get().computeIfAbsent(toAttach, unused -> new ArrayDeque<>()).addLast(scope);
}
@Override
public void detach(Context toDetach, Context toRestore) {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Context current = otelContext.get(GRPC_CONTEXT);
if (current != toDetach) {
// Log a severe message instead of throwing an exception as the context to attach is assumed
// to be the correct one and the unbalanced state represents a coding mistake in a lower
// layer in the stack that cannot be recovered from here.
logger.log(
Level.SEVERE,
"Context was not attached when detaching",
new Throwable().fillInStackTrace());
}
Map<Context, Deque<Scope>> contextStacks = contextScopes.get();
Deque<Scope> stack = contextStacks.get(toDetach);
Scope scope = stack.pollLast();
if (scope == null) {
logger.log(
Level.SEVERE,
"Detaching context which was not attached.",
new Throwable().fillInStackTrace());
} else {
scope.close();
}
if (stack.isEmpty()) {
contextStacks.remove(toDetach);
}
}
@Override
public Context current() {
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
Context current = otelContext.get(GRPC_CONTEXT);
if (current == null) {
return Context.ROOT.withValue(OTEL_CONTEXT, otelContext);
}
// Store the current OTel context in the gRPC context so that gRPC context propagation
// mechanisms will also propagate the OTel context.
io.opentelemetry.context.Context previousOtelContext = OTEL_CONTEXT.get(current);
if (previousOtelContext != otelContext) {
// This context has already been previously attached and associated with an OTel context. Just
// create a new context referring to the current OTel context to reflect the current stack.
// The previous context is unaffected and will continue to live in its own stack.
return current.withValue(OTEL_CONTEXT, otelContext);
}
return current;
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.grpc.v1_5;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class ContextBridgeTest {
private static final ContextKey<String> ANIMAL = ContextKey.named("animal");
private static final io.grpc.Context.Key<String> FOOD = io.grpc.Context.key("food");
private static final io.grpc.Context.Key<String> COUNTRY = io.grpc.Context.key("country");
private static ExecutorService otherThread;
@BeforeAll
static void setUp() {
otherThread = Executors.newSingleThreadExecutor();
}
@AfterAll
static void tearDown() {
otherThread.shutdown();
}
@Test
void grpcOtelMix() {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
assertThat(COUNTRY.get()).isNull();
io.grpc.Context root = grpcContext.attach();
try {
assertThat(COUNTRY.get()).isEqualTo("japan");
try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) {
assertThat(Context.current().get(ANIMAL)).isEqualTo("cat");
assertThat(COUNTRY.get()).isEqualTo("japan");
io.grpc.Context context2 = io.grpc.Context.current().withValue(FOOD, "cheese");
assertThat(FOOD.get()).isNull();
io.grpc.Context toRestore = context2.attach();
try {
assertThat(FOOD.get()).isEqualTo("cheese");
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().get(ANIMAL)).isEqualTo("cat");
} finally {
context2.detach(toRestore);
}
}
} finally {
grpcContext.detach(root);
}
}
@Test
void grpcWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().get(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().get(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(io.grpc.Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
@Test
void otelWrap() throws Exception {
io.grpc.Context grpcContext = io.grpc.Context.current().withValue(COUNTRY, "japan");
io.grpc.Context root = grpcContext.attach();
try {
try (Scope ignored = Context.current().with(ANIMAL, "cat").makeCurrent()) {
assertThat(COUNTRY.get()).isEqualTo("japan");
assertThat(Context.current().get(ANIMAL)).isEqualTo("cat");
AtomicReference<String> grpcValue = new AtomicReference<>();
AtomicReference<String> otelValue = new AtomicReference<>();
Runnable runnable =
() -> {
grpcValue.set(COUNTRY.get());
otelValue.set(Context.current().get(ANIMAL));
};
otherThread.submit(runnable).get();
assertThat(grpcValue).hasValue(null);
assertThat(otelValue).hasValue(null);
otherThread.submit(Context.current().wrap(runnable)).get();
assertThat(grpcValue).hasValue("japan");
assertThat(otelValue).hasValue("cat");
}
} finally {
grpcContext.detach(root);
}
}
}