Rework grpc cancelation propagation (#8957)

This commit is contained in:
Lauri Tulmin 2023-08-08 11:00:10 +03:00 committed by GitHub
parent 47aca546c5
commit d749ac0091
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 139 additions and 19 deletions

View File

@ -36,5 +36,11 @@ tasks {
jvmArgs("-Dotel.javaagent.experimental.thread-propagation-debugger.enabled=false")
jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.client.request=some-client-key")
jvmArgs("-Dotel.instrumentation.grpc.capture-metadata.server.request=some-server-key")
// exclude our grpc library instrumentation, the ContextStorageOverride contained within it
// breaks the tests
classpath = classpath.filter {
!it.absolutePath.contains("opentelemetry-grpc-1.6")
}
}
}

View File

@ -36,14 +36,20 @@ public class GrpcContextInstrumentation implements TypeInstrumentation {
@SuppressWarnings("unused")
public static class ContextBridgeAdvice {
@Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class)
public static Object onEnter() {
return null;
@Advice.OnMethodEnter(skipOn = Advice.OnNonDefaultValue.class)
public static Context.Storage onEnter() {
return GrpcSingletons.getStorage();
}
@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) Context.Storage storage) {
storage = GrpcSingletons.STORAGE;
public static void onExit(
@Advice.Return(readOnly = false) Context.Storage storage,
@Advice.Enter Context.Storage ourStorage) {
if (ourStorage != null) {
storage = ourStorage;
} else {
storage = GrpcSingletons.setStorage(storage);
}
}
}
}

View File

@ -15,6 +15,7 @@ import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry;
import io.opentelemetry.instrumentation.grpc.v1_6.internal.ContextStorageBridge;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
// Holds singleton references.
public final class GrpcSingletons {
@ -23,7 +24,7 @@ public final class GrpcSingletons {
public static final ServerInterceptor SERVER_INTERCEPTOR;
public static final Context.Storage STORAGE = new ContextStorageBridge(false);
private static final AtomicReference<Context.Storage> STORAGE_REFERENCE = new AtomicReference<>();
static {
boolean experimentalSpanAttributes =
@ -48,5 +49,14 @@ public final class GrpcSingletons {
SERVER_INTERCEPTOR = telemetry.newServerInterceptor();
}
public static Context.Storage getStorage() {
return STORAGE_REFERENCE.get();
}
public static Context.Storage setStorage(Context.Storage storage) {
STORAGE_REFERENCE.compareAndSet(null, new ContextStorageBridge(storage));
return getStorage();
}
private GrpcSingletons() {}
}

View File

@ -28,11 +28,24 @@ public final class ContextStorageBridge extends Context.Storage {
private static final Context.Key<io.opentelemetry.context.Context> OTEL_CONTEXT =
Context.key("otel-context");
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");
// context attached to original context store
private static final Context.Key<Context> ORIGINAL_CONTEXT = Context.key("original-context");
// context that should be restored in original context store on detach
private static final Context.Key<Context> ORIGINAL_TO_RESTORE =
Context.key("original-to-restore");
private final boolean propagateGrpcDeadline;
// original context storage that would have been used when running without agent
private final Context.Storage originalStorage;
public ContextStorageBridge(boolean propagateGrpcDeadline) {
this.propagateGrpcDeadline = propagateGrpcDeadline;
this.originalStorage = null;
}
public ContextStorageBridge(Context.Storage originalStorage) {
propagateGrpcDeadline = false;
this.originalStorage = originalStorage;
}
@Override
@ -45,7 +58,9 @@ public final class ContextStorageBridge extends Context.Storage {
}
if (current == toAttach) {
return current.withValue(OTEL_SCOPE, Scope.noop());
Context result = current.withValue(OTEL_SCOPE, Scope.noop());
result = attachOriginalContextStorage(result);
return result;
}
io.opentelemetry.context.Context base = OTEL_CONTEXT.get(toAttach);
@ -64,11 +79,28 @@ public final class ContextStorageBridge extends Context.Storage {
}
Scope scope = newOtelContext.makeCurrent();
return current.withValue(OTEL_SCOPE, scope);
Context result = current.withValue(OTEL_SCOPE, scope);
result = attachOriginalContextStorage(result);
return result;
}
private Context attachOriginalContextStorage(Context context) {
Context result = context;
if (originalStorage != null) {
Context originalToRestore = originalStorage.doAttach(result);
result = result.withValues(ORIGINAL_CONTEXT, result, ORIGINAL_TO_RESTORE, originalToRestore);
}
return result;
}
@Override
public void detach(Context toDetach, Context toRestore) {
if (originalStorage != null) {
Context originalContext = ORIGINAL_CONTEXT.get(toRestore);
Context originalToRestore = ORIGINAL_TO_RESTORE.get(toRestore);
originalStorage.detach(originalContext, originalToRestore);
}
Scope scope = OTEL_SCOPE.get(toRestore);
if (scope == null) {
logger.log(
@ -93,17 +125,18 @@ public final class ContextStorageBridge extends Context.Storage {
// create a new context referring to the current OTel context to reflect the current stack.
// The previous context is unaffected and will continue to live in its own stack.
if (!propagateGrpcDeadline) {
// Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a
// deadline where it
// wasn't present before. Notably, this could happen with no user intention when using the
// javaagent which will
// add OpenTelemetry propagation automatically, and cause that code to fail with a deadline
// cancellation. While
// ideally we could propagate deadline as well as gRPC intended, we cannot have existing
// code fail because it
// added the javaagent and choose to fork here.
current = current.fork();
if (!propagateGrpcDeadline && originalStorage != null) {
Context originalCurrent = originalStorage.current();
// check whether grpc context would have propagated without otel context
if (originalCurrent == null || originalCurrent == Context.ROOT) {
// Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a
// deadline where it wasn't present before. Notably, this could happen with no user
// intention when using the javaagent which will add OpenTelemetry propagation
// automatically, and cause that code to fail with a deadline cancellation. While ideally
// we could propagate deadline as well as gRPC intended, we cannot have existing code fail
// because it added the javaagent and choose to fork here.
current = current.fork();
}
}
return current.withValue(OTEL_CONTEXT, otelContext);

View File

@ -59,6 +59,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
@ -1469,6 +1470,70 @@ public abstract class AbstractGrpcTest {
assertThat(error).hasValue(null);
}
// Regression test for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/8923
@Test
void cancelListenerCalled() throws Exception {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch cancelLatch = new CountDownLatch(1);
AtomicBoolean cancelCalled = new AtomicBoolean();
Server server =
configureServer(
ServerBuilder.forPort(0)
.addService(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
Helloworld.Request request,
StreamObserver<Helloworld.Response> responseObserver) {
startLatch.countDown();
io.grpc.Context context = io.grpc.Context.current();
context.addListener(
context1 -> cancelCalled.set(true), MoreExecutors.directExecutor());
try {
cancelLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
responseObserver.onNext(
Helloworld.Response.newBuilder()
.setMessage(request.getName())
.build());
responseObserver.onCompleted();
}
}))
.build()
.start();
ManagedChannel channel = createChannel(server);
closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
closer.add(() -> server.shutdownNow().awaitTermination());
GreeterGrpc.GreeterFutureStub client = GreeterGrpc.newFutureStub(channel);
ListenableFuture<Helloworld.Response> future =
client.sayHello(Helloworld.Request.newBuilder().setName("test").build());
startLatch.await(10, TimeUnit.SECONDS);
future.cancel(false);
cancelLatch.countDown();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("example.Greeter/SayHello")
.hasKind(SpanKind.CLIENT)
.hasNoParent(),
span ->
span.hasName("example.Greeter/SayHello")
.hasKind(SpanKind.SERVER)
.hasParent(trace.getSpan(0))));
assertThat(cancelCalled.get()).isEqualTo(true);
}
@Test
void setCapturedRequestMetadata() throws Exception {
String metadataAttributePrefix = "rpc.grpc.request.metadata.";