Implement producer part of RocketMQ new client instrumentation (#6884)

Fix #6764 . This pull request is about the producer part.
This commit is contained in:
Aaron Ai 2022-10-28 10:25:22 +08:00 committed by GitHub
parent cd95517ddc
commit 029ed3d98b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 745 additions and 2 deletions

View File

@ -1,4 +1,4 @@
# Settings for the Apache RocketMQ Remoting-based client instrumentation
# Settings for the Apache RocketMQ remoting-based client instrumentation
| System property | Type | Default | Description |
|---|---|---|---|

View File

@ -1,4 +1,4 @@
# Library Instrumentation for Apache RocketMQ Remoting-based Client 4.0.0+
# Library Instrumentation for Apache RocketMQ remoting-based client 4.0.0+
Provides OpenTelemetry instrumentation for [Apache RocketMQ](https://rocketmq.apache.org/) remoting-based client.

View File

@ -0,0 +1,18 @@
plugins {
id("otel.javaagent-instrumentation")
}
muzzle {
pass {
group.set("org.apache.rocketmq")
module.set("rocketmq-client-java")
versions.set("[5.0.0,)")
assertInverse.set(true)
}
}
dependencies {
library("org.apache.rocketmq:rocketmq-client-java:5.0.0")
testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
/** Future converter, which covert future of list into list of future. */
public class FutureConverter {
private FutureConverter() {}
public static <T> List<SettableFuture<T>> convert(SettableFuture<List<T>> future, int num) {
List<SettableFuture<T>> futures = new ArrayList<>(num);
for (int i = 0; i < num; i++) {
SettableFuture<T> f = SettableFuture.create();
futures.add(f);
}
ListFutureCallback<T> futureCallback = new ListFutureCallback<>(futures);
Futures.addCallback(future, futureCallback, MoreExecutors.directExecutor());
return futures;
}
public static class ListFutureCallback<T> implements FutureCallback<List<T>> {
private final List<SettableFuture<T>> futures;
public ListFutureCallback(List<SettableFuture<T>> futures) {
this.futures = futures;
}
@Override
public void onSuccess(List<T> result) {
for (int i = 0; i < result.size(); i++) {
futures.get(i).set(result.get(i));
}
}
@Override
public void onFailure(Throwable t) {
for (SettableFuture<T> future : futures) {
future.setException(t);
}
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
enum MapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE;
@Override
public void set(@Nullable PublishingMessageImpl message, String key, String value) {
if (message == null) {
return;
}
Map<String, String> extraProperties = VirtualFieldStore.getExtraPropertiesByMessage(message);
if (extraProperties == null) {
extraProperties = new HashMap<>();
VirtualFieldStore.setExtraPropertiesByMessage(message, extraProperties);
}
extraProperties.put(key, value);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static java.util.Arrays.asList;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
@AutoService(InstrumentationModule.class)
public final class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-5.0");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation());
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.List;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
final class RocketMqInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-5.0";
private RocketMqInstrumenterFactory() {}
public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) {
RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND;
AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);
InstrumenterBuilder<PublishingMessageImpl, SendReceiptImpl> instrumenterBuilder =
Instrumenter.<PublishingMessageImpl, SendReceiptImpl>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> {
if (null != error) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});
return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE);
}
private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, R> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.DELAY;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.FIFO;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.TRANSACTION;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
enum RocketMqProducerAttributeExtractor
implements AttributesExtractor<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;
@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, PublishingMessageImpl message) {
message.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(message.getKeys()));
switch (message.getMessageType()) {
case FIFO:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, FIFO);
break;
case DELAY:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, DELAY);
break;
case TRANSACTION:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, TRANSACTION);
break;
default:
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL);
}
}
@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
PublishingMessageImpl message,
@Nullable SendReceiptImpl sendReceipt,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
enum RocketMqProducerAttributeGetter
implements MessagingAttributesGetter<PublishingMessageImpl, SendReceiptImpl> {
INSTANCE;
@Nullable
@Override
public String system(PublishingMessageImpl message) {
return "rocketmq";
}
@Nullable
@Override
public String destinationKind(PublishingMessageImpl message) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Nullable
@Override
public String destination(PublishingMessageImpl message) {
return message.getTopic();
}
@Override
public boolean temporaryDestination(PublishingMessageImpl message) {
return false;
}
@Nullable
@Override
public String protocol(PublishingMessageImpl message) {
return null;
}
@Nullable
@Override
public String protocolVersion(PublishingMessageImpl message) {
return null;
}
@Nullable
@Override
public String url(PublishingMessageImpl message) {
return null;
}
@Nullable
@Override
public String conversationId(PublishingMessageImpl message) {
return null;
}
@Nullable
@Override
public Long messagePayloadSize(PublishingMessageImpl message) {
return (long) message.getBody().remaining();
}
@Nullable
@Override
public Long messagePayloadCompressedSize(PublishingMessageImpl message) {
return null;
}
@Nullable
@Override
public String messageId(PublishingMessageImpl message, @Nullable SendReceiptImpl sendReceipt) {
return message.getMessageId().toString();
}
@Override
public List<String> header(PublishingMessageImpl message, String name) {
String value = message.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
final class RocketMqProducerInstrumentation implements TypeInstrumentation {
/** Match the implementation of RocketMQ producer. */
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.producer.ProducerImpl");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("send0"))
.and(isPrivate())
.and(takesArguments(6))
.and(
takesArgument(
0,
named(
"org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture")))
.and(takesArgument(1, String.class))
.and(takesArgument(2, named("org.apache.rocketmq.client.java.message.MessageType")))
.and(takesArgument(3, List.class))
.and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)),
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
}
@SuppressWarnings("unused")
public static class SendAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) SettableFuture<List<SendReceiptImpl>> future0,
@Advice.Argument(4) List<PublishingMessageImpl> messages) {
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter =
RocketMqSingletons.producerInstrumenter();
int count = messages.size();
List<SettableFuture<SendReceiptImpl>> futures = FutureConverter.convert(future0, count);
for (int i = 0; i < count; i++) {
PublishingMessageImpl message = messages.get(i);
// Try to extract parent context.
Context parentContext = VirtualFieldStore.getContextByMessage(message);
if (parentContext == null) {
parentContext = Context.current();
}
Span span = Span.fromContext(parentContext);
if (!span.getSpanContext().isValid()) {
parentContext = Context.current();
}
SettableFuture<SendReceiptImpl> future = futures.get(i);
if (!instrumenter.shouldStart(parentContext, message)) {
return;
}
Context context = instrumenter.start(parentContext, message);
Futures.addCallback(
future,
new SpanFinishingCallback(instrumenter, context, message),
MoreExecutors.directExecutor());
}
}
}
public static class SpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
private final Context context;
private final PublishingMessageImpl message;
public SpanFinishingCallback(
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
Context context,
PublishingMessageImpl message) {
this.instrumenter = instrumenter;
this.context = context;
this.message = message;
}
@Override
public void onSuccess(SendReceiptImpl sendReceipt) {
instrumenter.end(context, message, sendReceipt, null);
}
@Override
public void onFailure(Throwable t) {
instrumenter.end(context, message, null, t);
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf(
"org.apache.rocketmq.client.java.message.PublishingMessageImpl",
"org.apache.rocketmq.client.java.message.MessageImpl");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(isPublic())
.and(takesArgument(0, named("org.apache.rocketmq.client.apis.message.Message")))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorAdvice {
/**
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) {
VirtualFieldStore.setContextByMessage(message, Context.current());
}
}
@SuppressWarnings("unused")
public static class GetPropertiesAdvice {
/** Update the message properties to propagate context recorded by {@link MapSetter}. */
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This MessageImpl messageImpl,
@Advice.Return(readOnly = false) Map<String, String> properties) {
if (!(messageImpl instanceof PublishingMessageImpl)) {
return;
}
PublishingMessageImpl message = (PublishingMessageImpl) messageImpl;
Map<String, String> extraProperties = VirtualFieldStore.getExtraPropertiesByMessage(message);
if (extraProperties != null) {
properties.putAll(extraProperties);
}
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
public final class RocketMqSingletons {
private static final Instrumenter<PublishingMessageImpl, SendReceiptImpl> PRODUCER_INSTRUMENTER;
static {
PRODUCER_INSTRUMENTER =
RocketMqInstrumenterFactory.createProducerInstrumenter(
GlobalOpenTelemetry.get(), ExperimentalConfig.get().getMessagingHeaders());
}
public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
}
private RocketMqSingletons() {}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.Map;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
public class VirtualFieldStore {
private static final VirtualField<PublishingMessageImpl, Context> messageContextField =
VirtualField.find(PublishingMessageImpl.class, Context.class);
private static final VirtualField<PublishingMessageImpl, Map<String, String>>
messageExtraPropertiesField = VirtualField.find(PublishingMessageImpl.class, Map.class);
private VirtualFieldStore() {}
public static Context getContextByMessage(PublishingMessageImpl message) {
return messageContextField.get(message);
}
public static void setContextByMessage(PublishingMessageImpl message, Context context) {
messageContextField.set(message, context);
}
public static Map<String, String> getExtraPropertiesByMessage(PublishingMessageImpl message) {
return messageExtraPropertiesField.get(message);
}
public static void setExtraPropertiesByMessage(
PublishingMessageImpl message, Map<String, String> extraProperties) {
messageExtraPropertiesField.set(message, extraProperties);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RocketMqClientTest extends AbstractRocketMqClientTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,12 @@
plugins {
id("otel.java-conventions")
}
dependencies {
api(project(":testing-common"))
// earlier versions have bugs that may make tests flaky.
implementation("org.apache.rocketmq:rocketmq-client-java:5.0.2")
implementation("org.testcontainers:testcontainers:1.17.5")
implementation("io.opentelemetry:opentelemetry-api")
}

View File

@ -0,0 +1,108 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
public abstract class AbstractRocketMqClientTest {
protected abstract InstrumentationExtension testing();
// TODO(aaron-ai): replace it by the official image.
private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0";
// We still need this container type to do fixed-port-mapping.
@SuppressWarnings({"deprecation", "rawtypes", "resource"})
@Test
public void testSendMessage() throws ClientException {
int proxyPort = PortUtils.findOpenPorts(4);
int brokerPort = proxyPort + 1;
int brokerHaPort = proxyPort + 2;
int namesrvPort = proxyPort + 3;
try (GenericContainer<?> container =
new FixedHostPortGenericContainer(IMAGE_NAME)
.withFixedExposedPort(proxyPort, proxyPort)
.withEnv("rocketmq.broker.port", String.valueOf(brokerPort))
.withEnv("rocketmq.proxy.port", String.valueOf(proxyPort))
.withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort))
.withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort))
.withExposedPorts(proxyPort)) {
// Start the container.
container.start();
String endpoints = "127.0.0.1:" + proxyPort;
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(endpoints).build();
// Inner topic of the container.
String topic = "normal-topic-0";
ClientServiceProvider provider = ClientServiceProvider.loadService();
Producer producer =
provider
.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
String tag = "tagA";
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys(keys)
.setBody(body)
.build();
SendReceipt sendReceipt = producer.send(message);
testing()
.waitAndAssertTraces(
traceAssert ->
traceAssert.hasSpansSatisfyingExactly(
spanDataAssert ->
spanDataAssert
.hasName(topic + " send")
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL),
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(
MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic))));
}
}
}

View File

@ -406,6 +406,8 @@ include(":instrumentation:rmi:javaagent")
include(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-4.8:javaagent")
include(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-4.8:library")
include(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-4.8:testing")
include(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:javaagent")
include(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing")
include(":instrumentation:runtime-metrics:javaagent")
include(":instrumentation:runtime-metrics:library")
include(":instrumentation:rxjava:rxjava-1.0:library")