diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy deleted file mode 100644 index f59d0859b2..0000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmqclient.v4_8 - - -import io.opentelemetry.instrumentation.test.AgentTestTrait -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer -import org.apache.rocketmq.client.producer.DefaultMQProducer - -class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait { - - @Override - void configureMQProducer(DefaultMQProducer producer) { - } - - @Override - void configureMQPushConsumer(DefaultMQPushConsumer consumer) { - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java new file mode 100644 index 0000000000..49dacda729 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v4_8; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.junit.jupiter.api.extension.RegisterExtension; + +class RocketMqClientTest extends AbstractRocketMqClientTest { + + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + InstrumentationExtension testing() { + return testing; + } + + @Override + void configureMqProducer(DefaultMQProducer producer) {} + + @Override + void configureMqPushConsumer(DefaultMQPushConsumer consumer) {} +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy deleted file mode 100644 index ae1e73136b..0000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.groovy +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmqclient.v4_8 - - -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer -import org.apache.rocketmq.client.producer.DefaultMQProducer - -import static java.util.Collections.singletonList - -class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait { - - @Override - void configureMQProducer(DefaultMQProducer producer) { - producer.getDefaultMQProducerImpl().registerSendMessageHook(RocketMqTelemetry.builder(openTelemetry) - .setCapturedHeaders(singletonList("test-message-header")) - .setCaptureExperimentalSpanAttributes(true) - .build().newTracingSendMessageHook()) - } - - @Override - void configureMQPushConsumer(DefaultMQPushConsumer consumer) { - consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(RocketMqTelemetry.builder(openTelemetry) - .setCapturedHeaders(singletonList("test-message-header")) - .setCaptureExperimentalSpanAttributes(true) - .build().newTracingConsumeMessageHook()) - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java new file mode 100644 index 0000000000..28b6fcd3c6 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/RocketMqClientTest.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v4_8; + +import static java.util.Collections.singletonList; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.junit.jupiter.api.extension.RegisterExtension; + +class RocketMqClientTest extends AbstractRocketMqClientTest { + + @RegisterExtension + private static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + InstrumentationExtension testing() { + return testing; + } + + @Override + @SuppressWarnings("deprecation") + // testing instrumentation of deprecated class + void configureMqProducer(DefaultMQProducer producer) { + producer + .getDefaultMQProducerImpl() + .registerSendMessageHook( + RocketMqTelemetry.builder(testing.getOpenTelemetry()) + .setCapturedHeaders(singletonList("test-message-header")) + .setCaptureExperimentalSpanAttributes(true) + .build() + .newTracingSendMessageHook()); + } + + @Override + @SuppressWarnings("deprecation") + // testing instrumentation of deprecated class + void configureMqPushConsumer(DefaultMQPushConsumer consumer) { + consumer + .getDefaultMQPushConsumerImpl() + .registerConsumeMessageHook( + RocketMqTelemetry.builder(testing.getOpenTelemetry()) + .setCapturedHeaders(singletonList("test-message-header")) + .setCaptureExperimentalSpanAttributes(true) + .build() + .newTracingConsumeMessageHook()); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy deleted file mode 100644 index 946731d843..0000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.groovy +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmqclient.v4_8 - -import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.semconv.SemanticAttributes -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer -import org.apache.rocketmq.client.producer.DefaultMQProducer -import org.apache.rocketmq.client.producer.SendCallback -import org.apache.rocketmq.client.producer.SendResult -import org.apache.rocketmq.client.producer.SendStatus -import org.apache.rocketmq.common.message.Message -import org.apache.rocketmq.remoting.common.RemotingHelper -import spock.lang.Shared -import spock.lang.Unroll - -import java.util.concurrent.CompletableFuture -import java.util.concurrent.TimeUnit - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -//TODO add tests for propagationEnabled flag -@Unroll -abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { - - @Shared - DefaultMQProducer producer - - @Shared - DefaultMQPushConsumer consumer - - @Shared - String sharedTopic - - @Shared - Message msg - - @Shared - def msgs = new ArrayList() - - @Shared - TracingMessageListener tracingMessageListener = new TracingMessageListener() - - abstract void configureMQProducer(DefaultMQProducer producer) - - abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer) - - def setupSpec() { - sharedTopic = BaseConf.initTopic() - msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) - Message msg1 = new Message(sharedTopic, "TagA", ("hello world a").getBytes()) - Message msg2 = new Message(sharedTopic, "TagB", ("hello world b").getBytes()) - msgs.add(msg1) - msgs.add(msg2) - producer = BaseConf.getProducer(BaseConf.nsAddr) - configureMQProducer(producer) - consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener) - configureMQPushConsumer(consumer) - - // for RocketMQ 5.x wait a bit to ensure that consumer is properly started up - if (Boolean.getBoolean("testLatestDeps")) { - Thread.sleep(30_000) - } - } - - def cleanupSpec() { - producer?.shutdown() - consumer?.shutdown() - BaseConf.deleteTempDir() - } - - def setup() { - tracingMessageListener.reset() - } - - def "test rocketmq produce callback"() { - CompletableFuture result = new CompletableFuture<>() - when: - producer.send(msg, new SendCallback() { - @Override - void onSuccess(SendResult sendResult) { - result.complete(sendResult) - } - - @Override - void onException(Throwable throwable) { - result.completeExceptionally(throwable) - } - }) - result.get(10, TimeUnit.SECONDS).sendStatus == SendStatus.SEND_OK - // waiting longer than assertTraces below does on its own because of CI flakiness - tracingMessageListener.waitForMessages() - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name sharedTopic + " publish" - kind PRODUCER - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "publish" - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.send_result" "SEND_OK" - } - } - span(1) { - name sharedTopic + " process" - kind CONSUMER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - } - } - span(2) { - name "messageListener" - kind INTERNAL - childOf span(1) - } - } - } - } - - def "test rocketmq produce and consume"() { - when: - runWithSpan("parent") { - SendResult sendResult = producer.send(msg) - assert sendResult.sendStatus == SendStatus.SEND_OK - } - // waiting longer than assertTraces below does on its own because of CI flakiness - tracingMessageListener.waitForMessages() - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - } - span(1) { - name sharedTopic + " publish" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "publish" - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.send_result" "SEND_OK" - } - } - span(2) { - name sharedTopic + " process" - kind CONSUMER - childOf span(1) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - } - } - span(3) { - name "messageListener" - kind INTERNAL - childOf span(2) - } - } - } - } - - def "test rocketmq produce and batch consume"() { - setup: - consumer.setConsumeMessageBatchMaxSize(2) - - when: - // This test assumes that messages are sent and received as a batch. Occasionally it happens - // that the messages are not received as a batch, but one by one. This doesn't match what the - // assertion expects. To reduce flakiness we retry the test when messages weren't received as - // a batch. - def maxAttempts = 5 - for (i in 1..maxAttempts) { - tracingMessageListener.reset() - - runWithSpan("parent") { - producer.send(msgs) - } - - tracingMessageListener.waitForMessages() - if (tracingMessageListener.getLastBatchSize() == 2) { - break - } else if (i < maxAttempts) { - // if messages weren't received as a batch we get 1 trace instead of 2 - ignoreTracesAndClear(1) - System.err.println("Messages weren't received as batch, retrying") - } - } - - then: - assertTraces(2) { - def producerSpan = null - - trace(0, 2) { - producerSpan = span(1) - - span(0) { - name "parent" - kind INTERNAL - } - span(1) { - name sharedTopic + " publish" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "publish" - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.send_result" "SEND_OK" - } - } - } - - trace(1, 4) { - span(0) { - name "multiple_sources receive" - kind CONSUMER - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - } - } - span(1) { - name sharedTopic + " process" - kind CONSUMER - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - } - childOf span(0) - hasLink producerSpan - } - span(2) { - name sharedTopic + " process" - kind CONSUMER - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagB" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - } - childOf span(0) - hasLink producerSpan - } - span(3) { - name "messageListener" - kind INTERNAL - childOf span(0) - } - } - } - } - - def "capture message header as span attributes"() { - when: - runWithSpan("parent") { - def msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) - msg.putUserProperty("test-message-header", "test") - SendResult sendResult = producer.send(msg) - assert sendResult.sendStatus == SendStatus.SEND_OK - } - // waiting longer than assertTraces below does on its own because of CI flakiness - tracingMessageListener.waitForMessages() - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - } - span(1) { - name sharedTopic + " publish" - kind PRODUCER - childOf span(0) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "publish" - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.send_result" "SEND_OK" - "messaging.header.test_message_header" { it == ["test"] } - } - } - span(2) { - name sharedTopic + " process" - kind CONSUMER - childOf span(1) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - "$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA" - "messaging.rocketmq.broker_address" String - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - "messaging.header.test_message_header" { it == ["test"] } - } - } - span(3) { - name "messageListener" - kind INTERNAL - childOf span(2) - } - } - } - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.groovy b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.groovy deleted file mode 100644 index b2fa0781be..0000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.groovy +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmqclient.v4_8 - -import java.util.concurrent.TimeUnit -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus -import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly -import org.apache.rocketmq.common.message.MessageExt - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicInteger - -import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan - -class TracingMessageListener implements MessageListenerOrderly { - private AtomicInteger lastBatchSize = new AtomicInteger() - private CountDownLatch messageReceived = new CountDownLatch(1) - - @Override - ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) { - lastBatchSize.set(list.size()) - messageReceived.countDown() - runWithSpan("messageListener") {} - return ConsumeOrderlyStatus.SUCCESS - } - - void reset() { - messageReceived = new CountDownLatch(1) - lastBatchSize.set(0) - } - - void waitForMessages() { - messageReceived.await(30, TimeUnit.SECONDS) - } - - int getLastBatchSize() { - return lastBatchSize.get() - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.java new file mode 100644 index 0000000000..d92399db32 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/AbstractRocketMqClientTest.java @@ -0,0 +1,436 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v4_8; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** TODO add tests for propagationEnabled flag */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +abstract class AbstractRocketMqClientTest { + + private static final Logger logger = LoggerFactory.getLogger(AbstractRocketMqClientTest.class); + + private DefaultMQProducer producer; + + private DefaultMQPushConsumer consumer; + + private String sharedTopic; + + private Message msg; + + private final List msgs = new ArrayList<>(); + + private final TracingMessageListener tracingMessageListener = new TracingMessageListener(); + + abstract InstrumentationExtension testing(); + + abstract void configureMqProducer(DefaultMQProducer producer); + + abstract void configureMqPushConsumer(DefaultMQPushConsumer consumer); + + @BeforeAll + void setup() throws MQClientException, InterruptedException { + sharedTopic = BaseConf.initTopic(); + msg = new Message(sharedTopic, "TagA", "Hello RocketMQ".getBytes(Charset.defaultCharset())); + Message msg1 = + new Message(sharedTopic, "TagA", "hello world a".getBytes(Charset.defaultCharset())); + Message msg2 = + new Message(sharedTopic, "TagB", "hello world b".getBytes(Charset.defaultCharset())); + msgs.add(msg1); + msgs.add(msg2); + producer = BaseConf.getProducer(BaseConf.nsAddr); + configureMqProducer(producer); + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener); + configureMqPushConsumer(consumer); + + // for RocketMQ 5.x wait a bit to ensure that consumer is properly started up + if (Boolean.getBoolean("testLatestDeps")) { + Thread.sleep(30_000); + } + } + + @AfterAll + void cleanup() { + if (producer != null) { + producer.shutdown(); + } + if (consumer != null) { + consumer.shutdown(); + } + BaseConf.deleteTempDir(); + } + + @BeforeEach + void resetTest() { + tracingMessageListener.reset(); + } + + @Test + void testRocketmqProduceCallback() + throws RemotingException, + InterruptedException, + MQClientException, + ExecutionException, + TimeoutException { + CompletableFuture result = new CompletableFuture<>(); + producer.send( + msg, + new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + result.complete(sendResult); + } + + @Override + public void onException(Throwable throwable) { + result.completeExceptionally(throwable); + } + }); + SendResult sendResult = result.get(10, TimeUnit.SECONDS); + assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK"); + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName(sharedTopic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + equalTo( + AttributeKey.stringKey("messaging.rocketmq.send_result"), + "SEND_OK")), + span -> + span.hasName(sharedTopic + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, + val -> val.isInstanceOf(Long.class)), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_id"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_offset"), + val -> val.isInstanceOf(Long.class))), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + @Test + void testRocketmqProduceAndConsume() throws Exception { + testing() + .runWithSpan( + "parent", + () -> { + SendResult sendResult = producer.send(msg); + assertEquals( + SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK"); + }); + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName(sharedTopic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + equalTo( + AttributeKey.stringKey("messaging.rocketmq.send_result"), + "SEND_OK")), + span -> + span.hasName(sharedTopic + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, + val -> val.isInstanceOf(Long.class)), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_id"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_offset"), + val -> val.isInstanceOf(Long.class))), + span -> + span.hasName("messageListener") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } + + @Test + void testRocketmqProduceAndBatchConsume() throws Exception { + consumer.setConsumeMessageBatchMaxSize(2); + // This test assumes that messages are sent and received as a batch. Occasionally it happens + // that the messages are not received as a batch, but one by one. This doesn't match what the + // assertion expects. To reduce flakiness we retry the test when messages weren't received as + // a batch. + int maxAttempts = 5; + for (int i = 0; i < maxAttempts; i++) { + tracingMessageListener.reset(); + testing().runWithSpan("parent", () -> producer.send(msgs)); + tracingMessageListener.waitForMessages(); + if (tracingMessageListener.getLastBatchSize() == 2) { + break; + } else if (i < maxAttempts) { + // if messages weren't received as a batch we get 1 trace instead of 2 + testing().waitForTraces(1); + testing().clearData(); + logger.error("Messages weren't received as batch, retrying"); + } + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName(sharedTopic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + equalTo( + AttributeKey.stringKey("messaging.rocketmq.send_result"), + "SEND_OK"))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("multiple_sources receive") + .hasKind(SpanKind.CONSUMER) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")), + span -> + span.hasName(sharedTopic + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(trace.getSpan(1).getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, + val -> val.isInstanceOf(Long.class)), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo( + SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.queue_id"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.queue_offset"), + val -> val.isInstanceOf(Long.class))), + span -> + span.hasName(sharedTopic + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(trace.getSpan(1).getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo( + SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, + val -> val.isInstanceOf(Long.class)), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo( + SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.queue_id"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.queue_offset"), + val -> val.isInstanceOf(Long.class))), + span -> + span.hasName("messageListener") + .hasParent(trace.getSpan(0)) + .hasKind(SpanKind.INTERNAL))); + } + } + + @Test + void captureMessageHeaderAsSpanAttributes() throws Exception { + tracingMessageListener.reset(); + testing() + .runWithSpan( + "parent", + () -> { + Message msg = + new Message( + sharedTopic, "TagA", "Hello RocketMQ".getBytes(Charset.defaultCharset())); + msg.putUserProperty("test-message-header", "test"); + SendResult sendResult = producer.send(msg); + assertEquals( + SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK"); + }); + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages(); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL), + span -> + span.hasName(sharedTopic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + equalTo( + AttributeKey.stringKey("messaging.rocketmq.send_result"), + "SEND_OK"), + equalTo( + AttributeKey.stringArrayKey( + "messaging.header.test_message_header"), + singletonList("test"))), + span -> + span.hasName(sharedTopic + " process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE, + val -> val.isInstanceOf(Long.class)), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_ID, + val -> val.isInstanceOf(String.class)), + equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"), + satisfies( + AttributeKey.stringKey("messaging.rocketmq.broker_address"), + val -> val.isInstanceOf(String.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_id"), + val -> val.isInstanceOf(Long.class)), + satisfies( + AttributeKey.longKey("messaging.rocketmq.queue_offset"), + val -> val.isInstanceOf(Long.class)), + equalTo( + AttributeKey.stringArrayKey( + "messaging.header.test_message_header"), + singletonList("test"))), + span -> + span.hasName("messageListener") + .hasParent(trace.getSpan(2)) + .hasKind(SpanKind.INTERNAL))); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.java new file mode 100644 index 0000000000..916c9bba00 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingMessageListener.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v4_8; + +import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; + +class TracingMessageListener implements MessageListenerOrderly { + + private final AtomicInteger lastBatchSize = new AtomicInteger(); + private CountDownLatch messageReceived = new CountDownLatch(1); + + @Override + public ConsumeOrderlyStatus consumeMessage( + List list, ConsumeOrderlyContext consumeOrderlyContext) { + lastBatchSize.set(list.size()); + messageReceived.countDown(); + runWithSpan("messageListener", () -> {}); + return ConsumeOrderlyStatus.SUCCESS; + } + + void reset() { + messageReceived = new CountDownLatch(1); + lastBatchSize.set(0); + } + + void waitForMessages() throws InterruptedException { + messageReceived.await(30, TimeUnit.SECONDS); + } + + int getLastBatchSize() { + return lastBatchSize.get(); + } +}