Do not propagate gRPC deadline when propagating OTel context via javaagent. (#5543)
* Add test for early return in gRPC pattern. * Do not propagate gRPC deadline when propagating OTel context via javaagent.
This commit is contained in:
parent
b25043c0e7
commit
1d9c23bfb3
|
@ -20,7 +20,7 @@ public final class GrpcSingletons {
|
||||||
|
|
||||||
public static final ServerInterceptor SERVER_INTERCEPTOR;
|
public static final ServerInterceptor SERVER_INTERCEPTOR;
|
||||||
|
|
||||||
public static final Context.Storage STORAGE = new ContextStorageBridge();
|
public static final Context.Storage STORAGE = new ContextStorageBridge(false);
|
||||||
|
|
||||||
static {
|
static {
|
||||||
boolean experimentalSpanAttributes =
|
boolean experimentalSpanAttributes =
|
||||||
|
|
|
@ -14,7 +14,7 @@ import io.opentelemetry.instrumentation.grpc.v1_6.internal.ContextStorageBridge;
|
||||||
*/
|
*/
|
||||||
public final class ContextStorageOverride extends Context.Storage {
|
public final class ContextStorageOverride extends Context.Storage {
|
||||||
|
|
||||||
private static final Context.Storage delegate = new ContextStorageBridge();
|
private static final Context.Storage delegate = new ContextStorageBridge(true);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Context doAttach(Context toAttach) {
|
public Context doAttach(Context toAttach) {
|
||||||
|
|
|
@ -28,6 +28,12 @@ public final class ContextStorageBridge extends Context.Storage {
|
||||||
Context.key("otel-context");
|
Context.key("otel-context");
|
||||||
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");
|
private static final Context.Key<Scope> OTEL_SCOPE = Context.key("otel-scope");
|
||||||
|
|
||||||
|
private final boolean propagateGrpcDeadline;
|
||||||
|
|
||||||
|
public ContextStorageBridge(boolean propagateGrpcDeadline) {
|
||||||
|
this.propagateGrpcDeadline = propagateGrpcDeadline;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Context doAttach(Context toAttach) {
|
public Context doAttach(Context toAttach) {
|
||||||
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
|
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();
|
||||||
|
@ -87,6 +93,20 @@ public final class ContextStorageBridge extends Context.Storage {
|
||||||
// This context has already been previously attached and associated with an OTel context. Just
|
// This context has already been previously attached and associated with an OTel context. Just
|
||||||
// create a new context referring to the current OTel context to reflect the current stack.
|
// create a new context referring to the current OTel context to reflect the current stack.
|
||||||
// The previous context is unaffected and will continue to live in its own stack.
|
// The previous context is unaffected and will continue to live in its own stack.
|
||||||
|
|
||||||
|
if (!propagateGrpcDeadline) {
|
||||||
|
// Because we are propagating gRPC context via OpenTelemetry here, we may also propagate a
|
||||||
|
// deadline where it
|
||||||
|
// wasn't present before. Notably, this could happen with no user intention when using the
|
||||||
|
// javaagent which will
|
||||||
|
// add OpenTelemetry propagation automatically, and cause that code to fail with a deadline
|
||||||
|
// cancellation. While
|
||||||
|
// ideally we could propagate deadline as well as gRPC intended, we cannot have existing
|
||||||
|
// code fail because it
|
||||||
|
// added the javaagent and choose to fork here.
|
||||||
|
current = current.fork();
|
||||||
|
}
|
||||||
|
|
||||||
return current.withValue(OTEL_CONTEXT, otelContext);
|
return current.withValue(OTEL_CONTEXT, otelContext);
|
||||||
}
|
}
|
||||||
return current;
|
return current;
|
||||||
|
|
|
@ -49,6 +49,8 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -1370,6 +1372,87 @@ public abstract class AbstractGrpcTest {
|
||||||
SemanticAttributes.MESSAGE_ID, 2L))))));
|
SemanticAttributes.MESSAGE_ID, 2L))))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Regression test for
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/4169
|
||||||
|
@Test
|
||||||
|
void clientCallAfterServerCompleted() throws Exception {
|
||||||
|
Server backend =
|
||||||
|
configureServer(
|
||||||
|
ServerBuilder.forPort(0)
|
||||||
|
.addService(
|
||||||
|
new GreeterGrpc.GreeterImplBase() {
|
||||||
|
@Override
|
||||||
|
public void sayHello(
|
||||||
|
Helloworld.Request request,
|
||||||
|
StreamObserver<Helloworld.Response> responseObserver) {
|
||||||
|
responseObserver.onNext(
|
||||||
|
Helloworld.Response.newBuilder()
|
||||||
|
.setMessage(request.getName())
|
||||||
|
.build());
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.build()
|
||||||
|
.start();
|
||||||
|
ManagedChannel backendChannel = createChannel(backend);
|
||||||
|
closer.add(() -> backendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
|
||||||
|
closer.add(() -> backend.shutdownNow().awaitTermination());
|
||||||
|
GreeterGrpc.GreeterBlockingStub backendStub = GreeterGrpc.newBlockingStub(backendChannel);
|
||||||
|
|
||||||
|
// This executor does not propagate context without the javaagent available.
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
closer.add(executor::shutdownNow);
|
||||||
|
|
||||||
|
CountDownLatch clientCallDone = new CountDownLatch(1);
|
||||||
|
AtomicReference<Throwable> error = new AtomicReference<>();
|
||||||
|
|
||||||
|
Server frontend =
|
||||||
|
configureServer(
|
||||||
|
ServerBuilder.forPort(0)
|
||||||
|
.addService(
|
||||||
|
new GreeterGrpc.GreeterImplBase() {
|
||||||
|
@Override
|
||||||
|
public void sayHello(
|
||||||
|
Helloworld.Request request,
|
||||||
|
StreamObserver<Helloworld.Response> responseObserver) {
|
||||||
|
responseObserver.onNext(
|
||||||
|
Helloworld.Response.newBuilder()
|
||||||
|
.setMessage(request.getName())
|
||||||
|
.build());
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
|
||||||
|
executor.execute(
|
||||||
|
() -> {
|
||||||
|
try {
|
||||||
|
backendStub.sayHello(request);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
error.set(t);
|
||||||
|
}
|
||||||
|
clientCallDone.countDown();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.build()
|
||||||
|
.start();
|
||||||
|
ManagedChannel frontendChannel = createChannel(frontend);
|
||||||
|
closer.add(() -> frontendChannel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS));
|
||||||
|
closer.add(() -> frontend.shutdownNow().awaitTermination());
|
||||||
|
|
||||||
|
GreeterGrpc.GreeterBlockingStub frontendStub = GreeterGrpc.newBlockingStub(frontendChannel);
|
||||||
|
frontendStub.sayHello(Helloworld.Request.newBuilder().setName("test").build());
|
||||||
|
|
||||||
|
// We don't assert on telemetry - the intention of this test is to verify that adding
|
||||||
|
// instrumentation, either as
|
||||||
|
// library or javaagent, does not cause exceptions in the business logic. The produced telemetry
|
||||||
|
// will be different
|
||||||
|
// for the two cases due to lack of context propagation in the library case, but that isn't what
|
||||||
|
// we're testing here.
|
||||||
|
|
||||||
|
clientCallDone.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
assertThat(error).hasValue(null);
|
||||||
|
}
|
||||||
|
|
||||||
private ManagedChannel createChannel(Server server) throws Exception {
|
private ManagedChannel createChannel(Server server) throws Exception {
|
||||||
ManagedChannelBuilder<?> channelBuilder =
|
ManagedChannelBuilder<?> channelBuilder =
|
||||||
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
|
configureClient(ManagedChannelBuilder.forAddress("localhost", server.getPort()));
|
||||||
|
|
Loading…
Reference in New Issue