Convert rocketmq unit test from groovy to java (#10520)

This commit is contained in:
Steve Rao 2024-02-21 01:02:24 +08:00 committed by GitHub
parent 3b15791d49
commit b14f1fe929
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 563 additions and 456 deletions

View File

@ -1,22 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8
import io.opentelemetry.instrumentation.test.AgentTestTrait
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {
@Override
void configureMQProducer(DefaultMQProducer producer) {
}
@Override
void configureMQPushConsumer(DefaultMQPushConsumer consumer) {
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.junit.jupiter.api.extension.RegisterExtension;
class RocketMqClientTest extends AbstractRocketMqClientTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
InstrumentationExtension testing() {
return testing;
}
@Override
void configureMqProducer(DefaultMQProducer producer) {}
@Override
void configureMqPushConsumer(DefaultMQPushConsumer consumer) {}
}

View File

@ -1,32 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import static java.util.Collections.singletonList
class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait {
@Override
void configureMQProducer(DefaultMQProducer producer) {
producer.getDefaultMQProducerImpl().registerSendMessageHook(RocketMqTelemetry.builder(openTelemetry)
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build().newTracingSendMessageHook())
}
@Override
void configureMQPushConsumer(DefaultMQPushConsumer consumer) {
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(RocketMqTelemetry.builder(openTelemetry)
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build().newTracingConsumeMessageHook())
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8;
import static java.util.Collections.singletonList;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.junit.jupiter.api.extension.RegisterExtension;
class RocketMqClientTest extends AbstractRocketMqClientTest {
@RegisterExtension
private static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
InstrumentationExtension testing() {
return testing;
}
@Override
@SuppressWarnings("deprecation")
// testing instrumentation of deprecated class
void configureMqProducer(DefaultMQProducer producer) {
producer
.getDefaultMQProducerImpl()
.registerSendMessageHook(
RocketMqTelemetry.builder(testing.getOpenTelemetry())
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build()
.newTracingSendMessageHook());
}
@Override
@SuppressWarnings("deprecation")
// testing instrumentation of deprecated class
void configureMqPushConsumer(DefaultMQPushConsumer consumer) {
consumer
.getDefaultMQPushConsumerImpl()
.registerConsumeMessageHook(
RocketMqTelemetry.builder(testing.getOpenTelemetry())
.setCapturedHeaders(singletonList("test-message-header"))
.setCaptureExperimentalSpanAttributes(true)
.build()
.newTracingConsumeMessageHook());
}
}

View File

@ -1,359 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8
import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.client.producer.SendStatus
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import spock.lang.Shared
import spock.lang.Unroll
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
//TODO add tests for propagationEnabled flag
@Unroll
abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
@Shared
DefaultMQProducer producer
@Shared
DefaultMQPushConsumer consumer
@Shared
String sharedTopic
@Shared
Message msg
@Shared
def msgs = new ArrayList<Message>()
@Shared
TracingMessageListener tracingMessageListener = new TracingMessageListener()
abstract void configureMQProducer(DefaultMQProducer producer)
abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
def setupSpec() {
sharedTopic = BaseConf.initTopic()
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
Message msg1 = new Message(sharedTopic, "TagA", ("hello world a").getBytes())
Message msg2 = new Message(sharedTopic, "TagB", ("hello world b").getBytes())
msgs.add(msg1)
msgs.add(msg2)
producer = BaseConf.getProducer(BaseConf.nsAddr)
configureMQProducer(producer)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener)
configureMQPushConsumer(consumer)
// for RocketMQ 5.x wait a bit to ensure that consumer is properly started up
if (Boolean.getBoolean("testLatestDeps")) {
Thread.sleep(30_000)
}
}
def cleanupSpec() {
producer?.shutdown()
consumer?.shutdown()
BaseConf.deleteTempDir()
}
def setup() {
tracingMessageListener.reset()
}
def "test rocketmq produce callback"() {
CompletableFuture<SendResult> result = new CompletableFuture<>()
when:
producer.send(msg, new SendCallback() {
@Override
void onSuccess(SendResult sendResult) {
result.complete(sendResult)
}
@Override
void onException(Throwable throwable) {
result.completeExceptionally(throwable)
}
})
result.get(10, TimeUnit.SECONDS).sendStatus == SendStatus.SEND_OK
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name sharedTopic + " publish"
kind PRODUCER
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
}
}
span(1) {
name sharedTopic + " process"
kind CONSUMER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
}
span(2) {
name "messageListener"
kind INTERNAL
childOf span(1)
}
}
}
}
def "test rocketmq produce and consume"() {
when:
runWithSpan("parent") {
SendResult sendResult = producer.send(msg)
assert sendResult.sendStatus == SendStatus.SEND_OK
}
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " publish"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
}
}
span(2) {
name sharedTopic + " process"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(2)
}
}
}
}
def "test rocketmq produce and batch consume"() {
setup:
consumer.setConsumeMessageBatchMaxSize(2)
when:
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
// a batch.
def maxAttempts = 5
for (i in 1..maxAttempts) {
tracingMessageListener.reset()
runWithSpan("parent") {
producer.send(msgs)
}
tracingMessageListener.waitForMessages()
if (tracingMessageListener.getLastBatchSize() == 2) {
break
} else if (i < maxAttempts) {
// if messages weren't received as a batch we get 1 trace instead of 2
ignoreTracesAndClear(1)
System.err.println("Messages weren't received as batch, retrying")
}
}
then:
assertTraces(2) {
def producerSpan = null
trace(0, 2) {
producerSpan = span(1)
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " publish"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
}
}
}
trace(1, 4) {
span(0) {
name "multiple_sources receive"
kind CONSUMER
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
span(1) {
name sharedTopic + " process"
kind CONSUMER
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
childOf span(0)
hasLink producerSpan
}
span(2) {
name sharedTopic + " process"
kind CONSUMER
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagB"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
childOf span(0)
hasLink producerSpan
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(0)
}
}
}
}
def "capture message header as span attributes"() {
when:
runWithSpan("parent") {
def msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
msg.putUserProperty("test-message-header", "test")
SendResult sendResult = producer.send(msg)
assert sendResult.sendStatus == SendStatus.SEND_OK
}
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " publish"
kind PRODUCER
childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
"messaging.header.test_message_header" { it == ["test"] }
}
}
span(2) {
name sharedTopic + " process"
kind CONSUMER
childOf span(1)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
"messaging.header.test_message_header" { it == ["test"] }
}
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(2)
}
}
}
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8
import java.util.concurrent.TimeUnit
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
import org.apache.rocketmq.common.message.MessageExt
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan
class TracingMessageListener implements MessageListenerOrderly {
private AtomicInteger lastBatchSize = new AtomicInteger()
private CountDownLatch messageReceived = new CountDownLatch(1)
@Override
ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
lastBatchSize.set(list.size())
messageReceived.countDown()
runWithSpan("messageListener") {}
return ConsumeOrderlyStatus.SUCCESS
}
void reset() {
messageReceived = new CountDownLatch(1)
lastBatchSize.set(0)
}
void waitForMessages() {
messageReceived.await(30, TimeUnit.SECONDS)
}
int getLastBatchSize() {
return lastBatchSize.get()
}
}

View File

@ -0,0 +1,436 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.rocketmqclient.v4_8.base.BaseConf;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** TODO add tests for propagationEnabled flag */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
abstract class AbstractRocketMqClientTest {
private static final Logger logger = LoggerFactory.getLogger(AbstractRocketMqClientTest.class);
private DefaultMQProducer producer;
private DefaultMQPushConsumer consumer;
private String sharedTopic;
private Message msg;
private final List<Message> msgs = new ArrayList<>();
private final TracingMessageListener tracingMessageListener = new TracingMessageListener();
abstract InstrumentationExtension testing();
abstract void configureMqProducer(DefaultMQProducer producer);
abstract void configureMqPushConsumer(DefaultMQPushConsumer consumer);
@BeforeAll
void setup() throws MQClientException, InterruptedException {
sharedTopic = BaseConf.initTopic();
msg = new Message(sharedTopic, "TagA", "Hello RocketMQ".getBytes(Charset.defaultCharset()));
Message msg1 =
new Message(sharedTopic, "TagA", "hello world a".getBytes(Charset.defaultCharset()));
Message msg2 =
new Message(sharedTopic, "TagB", "hello world b".getBytes(Charset.defaultCharset()));
msgs.add(msg1);
msgs.add(msg2);
producer = BaseConf.getProducer(BaseConf.nsAddr);
configureMqProducer(producer);
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener);
configureMqPushConsumer(consumer);
// for RocketMQ 5.x wait a bit to ensure that consumer is properly started up
if (Boolean.getBoolean("testLatestDeps")) {
Thread.sleep(30_000);
}
}
@AfterAll
void cleanup() {
if (producer != null) {
producer.shutdown();
}
if (consumer != null) {
consumer.shutdown();
}
BaseConf.deleteTempDir();
}
@BeforeEach
void resetTest() {
tracingMessageListener.reset();
}
@Test
void testRocketmqProduceCallback()
throws RemotingException,
InterruptedException,
MQClientException,
ExecutionException,
TimeoutException {
CompletableFuture<SendResult> result = new CompletableFuture<>();
producer.send(
msg,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
result.complete(sendResult);
}
@Override
public void onException(Throwable throwable) {
result.completeExceptionally(throwable);
}
});
SendResult sendResult = result.get(10, TimeUnit.SECONDS);
assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK");
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK")),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName("messageListener")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
@Test
void testRocketmqProduceAndConsume() throws Exception {
testing()
.runWithSpan(
"parent",
() -> {
SendResult sendResult = producer.send(msg);
assertEquals(
SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK");
});
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK")),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName("messageListener")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))));
}
@Test
void testRocketmqProduceAndBatchConsume() throws Exception {
consumer.setConsumeMessageBatchMaxSize(2);
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
// a batch.
int maxAttempts = 5;
for (int i = 0; i < maxAttempts; i++) {
tracingMessageListener.reset();
testing().runWithSpan("parent", () -> producer.send(msgs));
tracingMessageListener.waitForMessages();
if (tracingMessageListener.getLastBatchSize() == 2) {
break;
} else if (i < maxAttempts) {
// if messages weren't received as a batch we get 1 trace instead of 2
testing().waitForTraces(1);
testing().clearData();
logger.error("Messages weren't received as batch, retrying");
}
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK"))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("multiple_sources receive")
.hasKind(SpanKind.CONSUMER)
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(trace.getSpan(1).getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(
SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class))),
span ->
span.hasName("messageListener")
.hasParent(trace.getSpan(0))
.hasKind(SpanKind.INTERNAL)));
}
}
@Test
void captureMessageHeaderAsSpanAttributes() throws Exception {
tracingMessageListener.reset();
testing()
.runWithSpan(
"parent",
() -> {
Message msg =
new Message(
sharedTopic, "TagA", "Hello RocketMQ".getBytes(Charset.defaultCharset()));
msg.putUserProperty("test-message-header", "test");
SendResult sendResult = producer.send(msg);
assertEquals(
SendStatus.SEND_OK, sendResult.getSendStatus(), "Send status should be SEND_OK");
});
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages();
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(sharedTopic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
equalTo(
AttributeKey.stringKey("messaging.rocketmq.send_result"),
"SEND_OK"),
equalTo(
AttributeKey.stringArrayKey(
"messaging.header.test_message_header"),
singletonList("test"))),
span ->
span.hasName(sharedTopic + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, sharedTopic),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_BODY_SIZE,
val -> val.isInstanceOf(Long.class)),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG, "TagA"),
satisfies(
AttributeKey.stringKey("messaging.rocketmq.broker_address"),
val -> val.isInstanceOf(String.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_id"),
val -> val.isInstanceOf(Long.class)),
satisfies(
AttributeKey.longKey("messaging.rocketmq.queue_offset"),
val -> val.isInstanceOf(Long.class)),
equalTo(
AttributeKey.stringArrayKey(
"messaging.header.test_message_header"),
singletonList("test"))),
span ->
span.hasName("messageListener")
.hasParent(trace.getSpan(2))
.hasKind(SpanKind.INTERNAL)));
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v4_8;
import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
class TracingMessageListener implements MessageListenerOrderly {
private final AtomicInteger lastBatchSize = new AtomicInteger();
private CountDownLatch messageReceived = new CountDownLatch(1);
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
lastBatchSize.set(list.size());
messageReceived.countDown();
runWithSpan("messageListener", () -> {});
return ConsumeOrderlyStatus.SUCCESS;
}
void reset() {
messageReceived = new CountDownLatch(1);
lastBatchSize.set(0);
}
void waitForMessages() throws InterruptedException {
messageReceived.await(30, TimeUnit.SECONDS);
}
int getLastBatchSize() {
return lastBatchSize.get();
}
}