diff --git a/instrumentation/rocketmq-client-4.8/library/build.gradle.kts b/instrumentation/rocketmq-client-4.8/library/build.gradle.kts index 7df224e978..7d981e0c0c 100644 --- a/instrumentation/rocketmq-client-4.8/library/build.gradle.kts +++ b/instrumentation/rocketmq-client-4.8/library/build.gradle.kts @@ -4,6 +4,10 @@ plugins { dependencies { library("org.apache.rocketmq:rocketmq-client:4.8.0") - testImplementation(project(":instrumentation:rocketmq-client-4.8:testing")) + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + testLibrary("org.apache.rocketmq:rocketmq-test:4.8.0") + testImplementation(project(":instrumentation:rocketmq-client-4.8:testing")) } \ No newline at end of file diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java new file mode 100644 index 0000000000..1acebf32f2 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +@AutoValue +abstract class ContextAndScope { + + static ContextAndScope create(Context context, Scope scope) { + return new AutoValue_ContextAndScope(context, scope); + } + + abstract Context getContext(); + + abstract Scope getScope(); + + void close() { + getScope().close(); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java index 64ed0745b0..fd3d7d54b7 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java @@ -28,7 +28,12 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { return; } Context otelContext = tracer.startSpan(Context.current(), context.getMsgList()); - context.setMqTraceContext(otelContext); + + // it's safe to store the scope in the rocketMq trace context, both before() and after() methods + // are always called from the same thread; see: + // - ConsumeMessageConcurrentlyService$ConsumeRequest#run() + // - ConsumeMessageOrderlyService$ConsumeRequest#run() + context.setMqTraceContext(ContextAndScope.create(otelContext, otelContext.makeCurrent())); } @Override @@ -36,9 +41,10 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - if (context.getMqTraceContext() instanceof Context) { - Context otelContext = (Context) context.getMqTraceContext(); - tracer.end(otelContext); + if (context.getMqTraceContext() instanceof ContextAndScope) { + ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext(); + contextAndScope.close(); + tracer.end(contextAndScope.getContext()); } } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index 076ea2ebe4..6a271d57d9 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -5,6 +5,11 @@ package io.opentelemetry.instrumentation.rocketmq +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + import base.BaseConf import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes @@ -14,19 +19,12 @@ import org.apache.rocketmq.client.producer.SendCallback import org.apache.rocketmq.client.producer.SendResult import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.remoting.common.RemotingHelper -import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener import spock.lang.Shared import spock.lang.Unroll -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace @Unroll abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { - private static final int CONSUME_TIMEOUT = 30_000 - @Shared DefaultMQProducer producer @@ -34,10 +32,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { DefaultMQPushConsumer consumer @Shared - RMQOrderListener messageListener - - @Shared - def sharedTopic + String sharedTopic @Shared Message msg @@ -58,8 +53,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { msgs.add(msg2) producer = BaseConf.getProducer(BaseConf.nsAddr) configureMQProducer(producer) - messageListener = new RMQOrderListener() - consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", messageListener) + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new TracingMessageListener()) configureMQPushConsumer(consumer) } @@ -69,10 +63,6 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { BaseConf.deleteTempDir() } - def setup() { - messageListener.clearMsg() - } - def "test rocketmq produce callback"() { when: producer.send(msg, new SendCallback() { @@ -84,10 +74,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { void onException(Throwable throwable) { } }) - messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT) + then: assertTraces(1) { - trace(0, 2) { + trace(0, 3) { span(0) { name sharedTopic + " send" kind PRODUCER @@ -117,6 +107,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { "messaging.rocketmq.queue_offset" Long } } + span(2) { + name "messageListener" + kind INTERNAL + childOf span(1) + } } } } @@ -126,14 +121,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { runUnderTrace("parent") { producer.send(msg) } - messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT) + then: assertTraces(1) { - trace(0, 3) { - basicSpan(it, 0, "parent") + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + } span(1) { name sharedTopic + " send" kind PRODUCER + childOf span(0) attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic @@ -147,6 +146,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { span(2) { name sharedTopic + " process" kind CONSUMER + childOf span(1) attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic @@ -160,6 +160,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { "messaging.rocketmq.queue_offset" Long } } + span(3) { + name "messageListener" + kind INTERNAL + childOf span(2) + } } } } @@ -167,22 +172,27 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { def "test rocketmq produce and batch consume"() { setup: consumer.setConsumeMessageBatchMaxSize(2) + when: runUnderTrace("parent") { producer.send(msgs) } - messageListener.waitForMessageConsume(msgs.size(), CONSUME_TIMEOUT) + then: assertTraces(2) { - def itemStepSpan = null + def producerSpan = null trace(0, 2) { - itemStepSpan = span(1) + producerSpan = span(1) - basicSpan(it, 0, "parent") + span(0) { + name "parent" + kind INTERNAL + } span(1) { name sharedTopic + " send" kind PRODUCER + childOf span(0) attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic @@ -194,7 +204,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { } } - trace(1, 3) { + trace(1, 4) { span(0) { name "multiple_sources receive" kind CONSUMER @@ -219,7 +229,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { "messaging.rocketmq.queue_offset" Long } childOf span(0) - hasLink itemStepSpan + hasLink producerSpan } span(2) { name sharedTopic + " process" @@ -237,7 +247,12 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { "messaging.rocketmq.queue_offset" Long } childOf span(0) - hasLink itemStepSpan + hasLink producerSpan + } + span(3) { + name "messageListener" + kind INTERNAL + childOf span(0) } } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy new file mode 100644 index 0000000000..d3fbadaf5d --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq + +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan + +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly +import org.apache.rocketmq.common.message.MessageExt + +class TracingMessageListener implements MessageListenerOrderly { + @Override + ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { + runInternalSpan("messageListener") + return ConsumeOrderlyStatus.SUCCESS + } +} diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java index d9c19d8548..0230ab6986 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java @@ -8,12 +8,12 @@ package base; import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.test.listener.AbstractListener; import org.apache.rocketmq.test.util.MQRandomUtils; import org.apache.rocketmq.test.util.RandomUtil; @@ -45,7 +45,7 @@ public final class BaseConf { } public static DefaultMQPushConsumer getConsumer( - String nsAddr, String topic, String subExpression, AbstractListener listener) + String nsAddr, String topic, String subExpression, MessageListener listener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setInstanceName(RandomUtil.getStringByUUID());