Fix JMS context propagation problems (#2702)

This commit is contained in:
Mateusz Rzeszutek 2021-04-06 17:45:17 +02:00 committed by GitHub
parent 53ffe9fbef
commit 5760cd7313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 18 deletions

View File

@ -194,6 +194,47 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTopic("someTopic") | "topic" | "someTopic" session.createTopic("someTopic") | "topic" | "someTopic"
} }
def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}
when:
producer.send(destination, message)
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) { trace.span(index) {
name destinationName + " send" name destinationName + " send"

View File

@ -77,9 +77,7 @@ public class JmsMessageProducerInstrumentation implements TypeInstrumentation {
MessageDestination messageDestination = MessageDestination messageDestination =
tracer().extractDestination(message, defaultDestination); tracer().extractDestination(message, defaultDestination);
context = tracer().startProducerSpan(messageDestination, message); context = tracer().startProducerSpan(messageDestination, message);
// TODO: why are we propagating context only in this advice class? the other one does not scope = context.makeCurrent();
// inject current span context into JMS message
scope = tracer().startProducerScope(context, message);
} }
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
@ -90,9 +88,9 @@ public class JmsMessageProducerInstrumentation implements TypeInstrumentation {
if (scope == null) { if (scope == null) {
return; return;
} }
scope.close();
CallDepthThreadLocalMap.reset(MessageProducer.class); CallDepthThreadLocalMap.reset(MessageProducer.class);
scope.close();
if (throwable != null) { if (throwable != null) {
tracer().endExceptionally(context, throwable); tracer().endExceptionally(context, throwable);
} else { } else {
@ -129,6 +127,7 @@ public class JmsMessageProducerInstrumentation implements TypeInstrumentation {
} }
CallDepthThreadLocalMap.reset(MessageProducer.class); CallDepthThreadLocalMap.reset(MessageProducer.class);
scope.close();
if (throwable != null) { if (throwable != null) {
tracer().endExceptionally(context, throwable); tracer().endExceptionally(context, throwable);
} else { } else {

View File

@ -10,10 +10,8 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER; import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER;
import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER; import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,12 +41,7 @@ public class JmsTracer extends BaseTracer {
MessageDestination destination, String operation, Message message, long startTime) { MessageDestination destination, String operation, Message message, long startTime) {
Context parentContext = Context.root(); Context parentContext = Context.root();
if (message != null && "process".equals(operation)) { if (message != null && "process".equals(operation)) {
// TODO use BaseTracer.extract() which has context leak detection parentContext = extract(message, GETTER);
// (and fix the context leak that it is currently detecting when running Jms2Test)
parentContext =
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.extract(Context.root(), message, GETTER);
} }
SpanBuilder spanBuilder = SpanBuilder spanBuilder =
@ -64,12 +57,9 @@ public class JmsTracer extends BaseTracer {
Context parentContext = Context.current(); Context parentContext = Context.current();
SpanBuilder span = spanBuilder(parentContext, spanName(destination, "send"), PRODUCER); SpanBuilder span = spanBuilder(parentContext, spanName(destination, "send"), PRODUCER);
afterStart(span, destination, message); afterStart(span, destination, message);
return parentContext.with(span.startSpan()); Context context = parentContext.with(span.startSpan());
}
public Scope startProducerScope(Context context, Message message) {
inject(context, message, SETTER); inject(context, message, SETTER);
return context.makeCurrent(); return context;
} }
public String spanName(MessageDestination destination, String operation) { public String spanName(MessageDestination destination, String operation) {

View File

@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Unroll
@Unroll
class Jms1Test extends AgentInstrumentationSpecification { class Jms1Test extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(Jms1Test) private static final Logger logger = LoggerFactory.getLogger(Jms1Test)
@ -224,6 +226,47 @@ class Jms1Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)" session.createTemporaryTopic() | "topic" | "(temporary)"
} }
def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}
when:
producer.send(destination, message)
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) { trace.span(index) {
name destinationName + " send" name destinationName + " send"

View File

@ -73,6 +73,13 @@ public class ExecutorInstrumentationUtils {
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) { if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return false; return false;
} }
// Avoid instrumenting internal OrderedExecutor worker class
if (enclosingClass
.getName()
.equals("org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor")) {
return false;
}
} }
return true; return true;