Support more semantic convention for RocketMQ trace (#7871)

Fixes #7858
This commit is contained in:
Aaron Ai 2023-03-02 04:48:42 +08:00 committed by GitHub
parent ac709f4957
commit 88352fb8d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 286 additions and 18 deletions

View File

@ -6,6 +6,8 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
@ -25,6 +27,12 @@ enum RocketMqConsumerProcessAttributeExtractor
public void onStart(
AttributesBuilder attributes, Context parentContext, MessageView messageView) {
messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
messageView
.getMessageGroup()
.ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s));
messageView
.getDeliveryTimestamp()
.ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys()));
attributes.put(
MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView));

View File

@ -5,6 +5,8 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
@ -29,6 +31,10 @@ enum RocketMqProducerAttributeExtractor
public void onStart(
AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) {
message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
message.getMessageGroup().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_GROUP, s));
message
.getDeliveryTimestamp()
.ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys()));
switch (message.getMessageType()) {
case FIFO:

View File

@ -13,10 +13,14 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSA
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import io.opentelemetry.api.common.AttributeKey;
@ -34,7 +38,9 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
@ -56,9 +62,11 @@ import org.junit.jupiter.api.TestInstance;
public abstract class AbstractRocketMqClientTest {
// Inner topic of the container.
private static final String topic = "normal-topic-0";
private static final String normalTopic = "normal-topic-0";
private static final String fifoTopic = "fifo-topic-0";
private static final String delayTopic = "delay-topic-0";
private static final String tag = "tagA";
private static final String consumerGroup = "group-normal-topic-0";
private static final String consumerGroup = "group-0";
private static final RocketMqProxyContainer container = new RocketMqProxyContainer();
private final ClientServiceProvider provider = ClientServiceProvider.loadService();
@ -73,12 +81,16 @@ public abstract class AbstractRocketMqClientTest {
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build();
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
Map<String, FilterExpression> subscriptionExpressions = new HashMap<>();
subscriptionExpressions.put(normalTopic, filterExpression);
subscriptionExpressions.put(fifoTopic, filterExpression);
subscriptionExpressions.put(delayTopic, filterExpression);
consumer =
provider
.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setSubscriptionExpressions(subscriptionExpressions)
.setMessageListener(
messageView -> {
testing().runWithSpan("messageListener", () -> {});
@ -89,7 +101,7 @@ public abstract class AbstractRocketMqClientTest {
provider
.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.setTopics(normalTopic)
.build();
}
@ -106,13 +118,13 @@ public abstract class AbstractRocketMqClientTest {
}
@Test
void testSendAndConsumeMessage() throws Throwable {
void testSendAndConsumeNormalMessage() throws Throwable {
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setTopic(normalTopic)
.setTag(tag)
.setKeys(keys)
.setBody(body)
@ -130,18 +142,18 @@ public abstract class AbstractRocketMqClientTest {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
assertProducerSpan(span, topic, tag, keys, body, sendReceipt)
assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt)
.hasParent(trace.getSpan(0)));
sendSpanData.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertReceiveSpan(span, topic, consumerGroup),
span -> assertReceiveSpan(span, normalTopic, consumerGroup),
span ->
assertProcessSpan(
span,
sendSpanData.get(),
topic,
normalTopic,
consumerGroup,
tag,
keys,
@ -162,7 +174,7 @@ public abstract class AbstractRocketMqClientTest {
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setTopic(normalTopic)
.setTag(tag)
.setKeys(keys)
.setBody(body)
@ -188,19 +200,19 @@ public abstract class AbstractRocketMqClientTest {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent"),
span ->
assertProducerSpan(span, topic, tag, keys, body, sendReceipt)
assertProducerSpan(span, normalTopic, tag, keys, body, sendReceipt)
.hasParent(trace.getSpan(0)),
span -> span.hasName("child"));
sendSpanData.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertReceiveSpan(span, topic, consumerGroup),
span -> assertReceiveSpan(span, normalTopic, consumerGroup),
span ->
assertProcessSpan(
span,
sendSpanData.get(),
topic,
normalTopic,
consumerGroup,
tag,
keys,
@ -214,6 +226,114 @@ public abstract class AbstractRocketMqClientTest {
.hasParent(trace.getSpan(1))));
}
@Test
public void testSendAndConsumeFifoMessage() throws Throwable {
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
String messageGroup = "yourMessageGroup";
Message message =
provider
.newMessageBuilder()
.setTopic(fifoTopic)
.setTag(tag)
.setKeys(keys)
.setMessageGroup(messageGroup)
.setBody(body)
.build();
SendReceipt sendReceipt =
testing()
.runWithSpan(
"parent", (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
testing()
.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
assertProducerSpanWithFifoMessage(
span, fifoTopic, tag, keys, messageGroup, body, sendReceipt)
.hasParent(trace.getSpan(0)));
sendSpanData.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertReceiveSpan(span, fifoTopic, consumerGroup),
span ->
assertProcessSpanWithFifoMessage(
span,
sendSpanData.get(),
fifoTopic,
consumerGroup,
tag,
keys,
messageGroup,
body,
sendReceipt)
// As the child of receive span.
.hasParent(trace.getSpan(0)),
span ->
span.hasName("messageListener")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
@Test
public void testSendAndConsumeDelayMessage() throws Throwable {
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
long deliveryTimestamp = System.currentTimeMillis();
Message message =
provider
.newMessageBuilder()
.setTopic(delayTopic)
.setTag(tag)
.setKeys(keys)
.setDeliveryTimestamp(deliveryTimestamp)
.setBody(body)
.build();
SendReceipt sendReceipt =
testing()
.runWithSpan(
"parent", (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
testing()
.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
assertProducerSpanWithDelayMessage(
span, delayTopic, tag, keys, deliveryTimestamp, body, sendReceipt)
.hasParent(trace.getSpan(0)));
sendSpanData.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertReceiveSpan(span, delayTopic, consumerGroup),
span ->
assertProcessSpanWithDelayMessage(
span,
sendSpanData.get(),
delayTopic,
consumerGroup,
tag,
keys,
deliveryTimestamp,
body,
sendReceipt)
// As the child of receive span.
.hasParent(trace.getSpan(0)),
span ->
span.hasName("messageListener")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
@Test
public void testCapturedMessageHeaders() throws Throwable {
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
@ -221,7 +341,7 @@ public abstract class AbstractRocketMqClientTest {
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setTopic(normalTopic)
.setTag(tag)
.setKeys(keys)
.setBody(body)
@ -242,7 +362,7 @@ public abstract class AbstractRocketMqClientTest {
span ->
assertProducerSpan(
span,
topic,
normalTopic,
tag,
keys,
body,
@ -256,12 +376,12 @@ public abstract class AbstractRocketMqClientTest {
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> assertReceiveSpan(span, topic, consumerGroup),
span -> assertReceiveSpan(span, normalTopic, consumerGroup),
span ->
assertProcessSpan(
span,
sendSpanData.get(),
topic,
normalTopic,
consumerGroup,
tag,
keys,
@ -308,6 +428,68 @@ public abstract class AbstractRocketMqClientTest {
.hasAttributesSatisfyingExactly(attributeAssertions);
}
private static SpanDataAssert assertProducerSpanWithFifoMessage(
SpanDataAssert span,
String topic,
String tag,
String[] keys,
String messageGroup,
byte[] body,
SendReceipt sendReceipt,
AttributeAssertion... extraAttributes) {
List<AttributeAssertion> attributeAssertions =
new ArrayList<>(
Arrays.asList(
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO),
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION_NAME, topic)));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.PRODUCER)
.hasName(topic + " send")
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(attributeAssertions);
}
private static SpanDataAssert assertProducerSpanWithDelayMessage(
SpanDataAssert span,
String topic,
String tag,
String[] keys,
long deliveryTimestamp,
byte[] body,
SendReceipt sendReceipt,
AttributeAssertion... extraAttributes) {
List<AttributeAssertion> attributeAssertions =
new ArrayList<>(
Arrays.asList(
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY),
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION_NAME, topic)));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.PRODUCER)
.hasName(topic + " send")
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(attributeAssertions);
}
private static SpanDataAssert assertReceiveSpan(
SpanDataAssert span, String topic, String consumerGroup) {
return span.hasKind(SpanKind.CONSUMER)
@ -356,4 +538,76 @@ public abstract class AbstractRocketMqClientTest {
.hasLinks(LinkData.create(linkedSpan.getSpanContext()))
.hasAttributesSatisfyingExactly(attributeAssertions);
}
private static SpanDataAssert assertProcessSpanWithFifoMessage(
SpanDataAssert span,
SpanData linkedSpan,
String topic,
String consumerGroup,
String tag,
String[] keys,
String messageGroup,
byte[] body,
SendReceipt sendReceipt,
AttributeAssertion... extraAttributes) {
List<AttributeAssertion> attributeAssertions =
new ArrayList<>(
Arrays.asList(
equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_GROUP, messageGroup),
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "process")));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " process")
.hasStatus(StatusData.unset())
// Link to send span.
.hasLinks(LinkData.create(linkedSpan.getSpanContext()))
.hasAttributesSatisfyingExactly(attributeAssertions);
}
private static SpanDataAssert assertProcessSpanWithDelayMessage(
SpanDataAssert span,
SpanData linkedSpan,
String topic,
String consumerGroup,
String tag,
String[] keys,
long deliveryTimestamp,
byte[] body,
SendReceipt sendReceipt,
AttributeAssertion... extraAttributes) {
List<AttributeAssertion> attributeAssertions =
new ArrayList<>(
Arrays.asList(
equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_DELIVERY_TIMESTAMP, deliveryTimestamp),
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "process")));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " process")
.hasStatus(StatusData.unset())
// Link to send span.
.hasLinks(LinkData.create(linkedSpan.getSpanContext()))
.hasAttributesSatisfyingExactly(attributeAssertions);
}
}

View File

@ -11,7 +11,7 @@ import org.testcontainers.containers.GenericContainer;
public class RocketMqProxyContainer {
// TODO(aaron-ai): replace it by the official image.
private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0";
private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.1";
private final GenericContainer<?> container;
final String endpoints;