Implement consumer part of rocketmq new client instrumentation (#7019)

Fixes #6764 , This PR is about the consumer part.

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Aaron Ai 2022-11-15 20:21:14 +08:00 committed by GitHub
parent 87c7147a25
commit b3cd45685d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1048 additions and 93 deletions

View File

@ -16,3 +16,23 @@ dependencies {
testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing")) testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing"))
} }
tasks {
val testReceiveSpanDisabled by registering(Test::class) {
filter {
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
include("**/RocketMqClientSuppressReceiveSpanTest.*")
}
test {
filter {
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}
check {
dependsOn(testReceiveSpanDisabled)
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
final class ConsumeServiceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
isPublic()
.and(
takesArgument(
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
}
@SuppressWarnings("unused")
public static class ConstructorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
// Replace messageListener by wrapper.
if (!(messageListener instanceof MessageListenerWrapper)) {
messageListener = new MessageListenerWrapper(messageListener);
}
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
final class ConsumerImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("receiveMessage"))
.and(takesArguments(3))
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
.and(takesArgument(2, named("java.time.Duration"))),
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
}
@SuppressWarnings("unused")
public static class ReceiveMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Timer onStart() {
return Timer.start();
}
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(0) ReceiveMessageRequest request,
@Advice.Enter Timer timer,
@Advice.Return ListenableFuture<ReceiveMessageResult> future) {
ReceiveSpanFinishingCallback spanFinishingCallback =
new ReceiveSpanFinishingCallback(request, timer);
Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor());
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;
public final class MessageListenerWrapper implements MessageListener {
private final MessageListener delegator;
public MessageListenerWrapper(MessageListener delegator) {
this.delegator = delegator;
}
@Override
public ConsumeResult consume(MessageView messageView) {
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
if (parentContext == null) {
parentContext = Context.current();
}
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
RocketMqSingletons.consumerProcessInstrumenter();
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
return delegator.consume(messageView);
}
Context context = processInstrumenter.start(parentContext, messageView);
ConsumeResult consumeResult = null;
Throwable error = null;
try (Scope ignored = context.makeCurrent()) {
consumeResult = delegator.consume(messageView);
return consumeResult;
} catch (Throwable t) {
error = t;
throw t;
} finally {
processInstrumenter.end(context, messageView, consumeResult, error);
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.message.MessageView;
enum MessageMapGetter implements TextMapGetter<MessageView> {
INSTANCE;
@Override
public Iterable<String> keys(MessageView carrier) {
return carrier.getProperties().keySet();
}
@Nullable
@Override
public String get(@Nullable MessageView carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}

View File

@ -11,7 +11,7 @@ import java.util.Map;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
enum MapSetter implements TextMapSetter<PublishingMessageImpl> { enum MessageMapSetter implements TextMapSetter<PublishingMessageImpl> {
INSTANCE; INSTANCE;
@Override @Override

View File

@ -22,12 +22,11 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;
final class RocketMqProducerInstrumentation implements TypeInstrumentation { final class ProducerImplInstrumentation implements TypeInstrumentation {
/** Match the implementation of RocketMQ producer. */ /** Match the implementation of RocketMQ producer. */
@Override @Override
@ -52,7 +51,7 @@ final class RocketMqProducerInstrumentation implements TypeInstrumentation {
.and(takesArgument(3, List.class)) .and(takesArgument(3, List.class))
.and(takesArgument(4, List.class)) .and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)), .and(takesArgument(5, int.class)),
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice"); ProducerImplInstrumentation.class.getName() + "$SendAdvice");
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -86,34 +85,9 @@ final class RocketMqProducerInstrumentation implements TypeInstrumentation {
Context context = instrumenter.start(parentContext, message); Context context = instrumenter.start(parentContext, message);
Futures.addCallback( Futures.addCallback(
future, future,
new SpanFinishingCallback(instrumenter, context, message), new SendSpanFinishingCallback(instrumenter, context, message),
MoreExecutors.directExecutor()); MoreExecutors.directExecutor());
} }
} }
} }
public static class SpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
private final Context context;
private final PublishingMessageImpl message;
public SpanFinishingCallback(
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
Context context,
PublishingMessageImpl message) {
this.instrumenter = instrumenter;
this.context = context;
this.message = message;
}
@Override
public void onSuccess(SendReceiptImpl sendReceipt) {
instrumenter.end(context, message, sendReceipt, null);
}
@Override
public void onFailure(Throwable t) {
instrumenter.end(context, message, null, t);
}
}
} }

View File

@ -25,7 +25,7 @@ import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.java.message.MessageImpl; import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation { final class PublishingMessageImplInstrumentation implements TypeInstrumentation {
@Override @Override
public ElementMatcher<TypeDescription> typeMatcher() { public ElementMatcher<TypeDescription> typeMatcher() {
@ -44,10 +44,10 @@ final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrume
takesArgument( takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings"))) 1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)), .and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod( transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()), isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -56,7 +56,7 @@ final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrume
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that * The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link * user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link * Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}. * ProducerImplInstrumentation}.
*/ */
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) { public static void onExit(@Advice.This PublishingMessageImpl message) {
@ -66,7 +66,7 @@ final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrume
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class GetPropertiesAdvice { public static class GetPropertiesAdvice {
/** Update the message properties to propagate context recorded by {@link MapSetter}. */ /** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit( public static void onExit(
@Advice.This MessageImpl messageImpl, @Advice.This MessageImpl messageImpl,

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import java.util.List;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
public final class ReceiveSpanFinishingCallback implements FutureCallback<ReceiveMessageResult> {
private final ReceiveMessageRequest request;
private final Timer timer;
public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) {
this.request = request;
this.timer = timer;
}
@Override
public void onSuccess(ReceiveMessageResult receiveMessageResult) {
List<MessageViewImpl> messageViews = receiveMessageResult.getMessageViewImpls();
// Don't create spans when no messages were received.
if (messageViews.isEmpty()) {
return;
}
String consumerGroup = request.getGroup().getName();
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup);
}
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
Context context =
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
null,
timer.startTime(),
timer.now());
for (MessageViewImpl messageView : messageViews) {
VirtualFieldStore.setContextByMessage(messageView, context);
}
}
}
@Override
public void onFailure(Throwable throwable) {
Instrumenter<ReceiveMessageRequest, List<MessageView>> receiveInstrumenter =
RocketMqSingletons.consumerReceiveInstrumenter();
Context parentContext = Context.current();
if (receiveInstrumenter.shouldStart(parentContext, request)) {
InstrumenterUtil.startAndEnd(
receiveInstrumenter,
parentContext,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
enum RocketMqConsumerProcessAttributeExtractor
implements AttributesExtractor<MessageView, ConsumeResult> {
INSTANCE;
@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, MessageView messageView) {
messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s));
attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys()));
attributes.put(
MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView));
}
@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
MessageView messageView,
@Nullable ConsumeResult consumeResult,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
enum RocketMqConsumerProcessAttributeGetter
implements MessagingAttributesGetter<MessageView, ConsumeResult> {
INSTANCE;
@Nullable
@Override
public String system(MessageView messageView) {
return "rocketmq";
}
@Nullable
@Override
public String destinationKind(MessageView messageView) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Nullable
@Override
public String destination(MessageView messageView) {
return messageView.getTopic();
}
@Override
public boolean temporaryDestination(MessageView messageView) {
return false;
}
@Nullable
@Override
public String protocol(MessageView messageView) {
return null;
}
@Nullable
@Override
public String protocolVersion(MessageView messageView) {
return null;
}
@Nullable
@Override
public String url(MessageView messageView) {
return null;
}
@Nullable
@Override
public String conversationId(MessageView messageView) {
return null;
}
@Nullable
@Override
public Long messagePayloadSize(MessageView messageView) {
return (long) messageView.getBody().remaining();
}
@Nullable
@Override
public Long messagePayloadCompressedSize(MessageView messageView) {
return null;
}
@Nullable
@Override
public String messageId(MessageView messageView, @Nullable ConsumeResult unused) {
return messageView.getMessageId().toString();
}
@Override
public List<String> header(MessageView messageView, String name) {
String value = messageView.getProperties().get(name);
if (value != null) {
return Collections.singletonList(value);
}
return Collections.emptyList();
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.message.MessageView;
enum RocketMqConsumerReceiveAttributeExtractor
implements AttributesExtractor<ReceiveMessageRequest, List<MessageView>> {
INSTANCE;
@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, ReceiveMessageRequest request) {}
@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
ReceiveMessageRequest request,
@Nullable List<MessageView> messageViews,
@Nullable Throwable error) {
String consumerGroup = request.getGroup().getName();
attributes.put(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup);
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.apis.message.MessageView;
enum RocketMqConsumerReceiveAttributeGetter
implements MessagingAttributesGetter<ReceiveMessageRequest, List<MessageView>> {
INSTANCE;
@Nullable
@Override
public String system(ReceiveMessageRequest request) {
return "rocketmq";
}
@Nullable
@Override
public String destinationKind(ReceiveMessageRequest request) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Nullable
@Override
public String destination(ReceiveMessageRequest request) {
return request.getMessageQueue().getTopic().getName();
}
@Override
public boolean temporaryDestination(ReceiveMessageRequest request) {
return false;
}
@Nullable
@Override
public String protocol(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public String protocolVersion(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public String url(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public String conversationId(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public Long messagePayloadSize(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public Long messagePayloadCompressedSize(ReceiveMessageRequest request) {
return null;
}
@Nullable
@Override
public String messageId(ReceiveMessageRequest request, @Nullable List<MessageView> unused) {
return null;
}
}

View File

@ -21,6 +21,7 @@ public final class RocketMqInstrumentationModule extends InstrumentationModule {
@Override @Override
public List<TypeInstrumentation> typeInstrumentations() { public List<TypeInstrumentation> typeInstrumentations() {
return asList( return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation()); new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(),
new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation());
} }
} }

View File

@ -5,16 +5,22 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import java.util.List; import java.util.List;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
@ -25,7 +31,6 @@ final class RocketMqInstrumenterFactory {
public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter( public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProducerInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders) { OpenTelemetry openTelemetry, List<String> capturedHeaders) {
RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE; RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND; MessageOperation operation = MessageOperation.SEND;
@ -38,15 +43,62 @@ final class RocketMqInstrumenterFactory {
INSTRUMENTATION_NAME, INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation)) MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor) .addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE) .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE);
return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE);
}
public static Instrumenter<ReceiveMessageRequest, List<MessageView>>
createConsumerReceiveInstrumenter(
OpenTelemetry openTelemetry, List<String> capturedHeaders, boolean enabled) {
RocketMqConsumerReceiveAttributeGetter getter = RocketMqConsumerReceiveAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.RECEIVE;
MessagingAttributesExtractor<ReceiveMessageRequest, List<MessageView>> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);
InstrumenterBuilder<ReceiveMessageRequest, List<MessageView>> instrumenterBuilder =
Instrumenter.<ReceiveMessageRequest, List<MessageView>>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.setEnabled(enabled)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE);
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
public static Instrumenter<MessageView, ConsumeResult> createConsumerProcessInstrumenter(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean receiveInstrumentationEnabled) {
RocketMqConsumerProcessAttributeGetter getter = RocketMqConsumerProcessAttributeGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;
MessagingAttributesExtractor<MessageView, ConsumeResult> attributesExtractor =
buildMessagingAttributesExtractor(getter, operation, capturedHeaders);
InstrumenterBuilder<MessageView, ConsumeResult> instrumenterBuilder =
Instrumenter.<MessageView, ConsumeResult>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqConsumerProcessAttributeExtractor.INSTANCE)
.setSpanStatusExtractor( .setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> { (spanStatusBuilder, messageView, consumeResult, error) -> {
if (null != error) { if (error != null || ConsumeResult.FAILURE.equals(consumeResult)) {
spanStatusBuilder.setStatus(StatusCode.ERROR); spanStatusBuilder.setStatus(StatusCode.ERROR);
} }
}); });
return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE); if (receiveInstrumentationEnabled) {
SpanLinksExtractor<MessageView> spanLinksExtractor =
new PropagatorBasedSpanLinksExtractor<>(
openTelemetry.getPropagators().getTextMapPropagator(), MessageMapGetter.INSTANCE);
instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor);
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
return instrumenterBuilder.buildConsumerInstrumenter(MessageMapGetter.INSTANCE);
} }
private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor( private static <T, R> MessagingAttributesExtractor<T, R> buildMessagingAttributesExtractor(

View File

@ -5,25 +5,52 @@
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import apache.rocketmq.v2.ReceiveMessageRequest;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import java.util.List;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
public final class RocketMqSingletons { public final class RocketMqSingletons {
private static final Instrumenter<PublishingMessageImpl, SendReceiptImpl> PRODUCER_INSTRUMENTER; private static final Instrumenter<PublishingMessageImpl, SendReceiptImpl> PRODUCER_INSTRUMENTER;
private static final Instrumenter<ReceiveMessageRequest, List<MessageView>>
CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<MessageView, ConsumeResult> CONSUMER_PROCESS_INSTRUMENTER;
static { static {
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
List<String> messagingHeaders = ExperimentalConfig.get().getMessagingHeaders();
boolean receiveInstrumentationEnabled =
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
PRODUCER_INSTRUMENTER = PRODUCER_INSTRUMENTER =
RocketMqInstrumenterFactory.createProducerInstrumenter( RocketMqInstrumenterFactory.createProducerInstrumenter(openTelemetry, messagingHeaders);
GlobalOpenTelemetry.get(), ExperimentalConfig.get().getMessagingHeaders()); CONSUMER_RECEIVE_INSTRUMENTER =
RocketMqInstrumenterFactory.createConsumerReceiveInstrumenter(
openTelemetry, messagingHeaders, receiveInstrumentationEnabled);
CONSUMER_PROCESS_INSTRUMENTER =
RocketMqInstrumenterFactory.createConsumerProcessInstrumenter(
openTelemetry, messagingHeaders, receiveInstrumentationEnabled);
} }
public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> producerInstrumenter() { public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> producerInstrumenter() {
return PRODUCER_INSTRUMENTER; return PRODUCER_INSTRUMENTER;
} }
public static Instrumenter<ReceiveMessageRequest, List<MessageView>>
consumerReceiveInstrumenter() {
return CONSUMER_RECEIVE_INSTRUMENTER;
}
public static Instrumenter<MessageView, ConsumeResult> consumerProcessInstrumenter() {
return CONSUMER_PROCESS_INSTRUMENTER;
}
private RocketMqSingletons() {} private RocketMqSingletons() {}
} }

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback;
public final class SendSpanFinishingCallback implements FutureCallback<SendReceiptImpl> {
private final Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter;
private final Context context;
private final PublishingMessageImpl message;
public SendSpanFinishingCallback(
Instrumenter<PublishingMessageImpl, SendReceiptImpl> instrumenter,
Context context,
PublishingMessageImpl message) {
this.instrumenter = instrumenter;
this.context = context;
this.message = message;
}
@Override
public void onSuccess(SendReceiptImpl sendReceipt) {
instrumenter.end(context, message, sendReceipt, null);
}
@Override
public void onFailure(Throwable t) {
instrumenter.end(context, message, null, t);
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import java.time.Instant;
public class Timer {
public static Timer start() {
return new Timer(Instant.now(), System.nanoTime());
}
private final Instant startTime;
private final long startNanoTime;
private Timer(Instant startTime, long startNanoTime) {
this.startTime = startTime;
this.startNanoTime = startNanoTime;
}
public Instant startTime() {
return startTime;
}
public Instant now() {
long durationNanos = System.nanoTime() - startNanoTime;
return startTime().plusNanos(durationNanos);
}
}

View File

@ -8,13 +8,18 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.Map; import java.util.Map;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
public class VirtualFieldStore { public class VirtualFieldStore {
private static final VirtualField<PublishingMessageImpl, Context> messageContextField = private static final VirtualField<PublishingMessageImpl, Context> messageContextField =
VirtualField.find(PublishingMessageImpl.class, Context.class); VirtualField.find(PublishingMessageImpl.class, Context.class);
private static final VirtualField<MessageView, Context> messageViewContextField =
VirtualField.find(MessageView.class, Context.class);
private static final VirtualField<PublishingMessageImpl, Map<String, String>> private static final VirtualField<PublishingMessageImpl, Map<String, String>>
messageExtraPropertiesField = VirtualField.find(PublishingMessageImpl.class, Map.class); messageExtraPropertiesField = VirtualField.find(PublishingMessageImpl.class, Map.class);
private static final VirtualField<MessageView, String> messageConsumerGroupField =
VirtualField.find(MessageView.class, String.class);
private VirtualFieldStore() {} private VirtualFieldStore() {}
@ -22,10 +27,18 @@ public class VirtualFieldStore {
return messageContextField.get(message); return messageContextField.get(message);
} }
public static Context getContextByMessage(MessageView messageView) {
return messageViewContextField.get(messageView);
}
public static void setContextByMessage(PublishingMessageImpl message, Context context) { public static void setContextByMessage(PublishingMessageImpl message, Context context) {
messageContextField.set(message, context); messageContextField.set(message, context);
} }
public static void setContextByMessage(MessageView message, Context context) {
messageViewContextField.set(message, context);
}
public static Map<String, String> getExtraPropertiesByMessage(PublishingMessageImpl message) { public static Map<String, String> getExtraPropertiesByMessage(PublishingMessageImpl message) {
return messageExtraPropertiesField.get(message); return messageExtraPropertiesField.get(message);
} }
@ -34,4 +47,12 @@ public class VirtualFieldStore {
PublishingMessageImpl message, Map<String, String> extraProperties) { PublishingMessageImpl message, Map<String, String> extraProperties) {
messageExtraPropertiesField.set(message, extraProperties); messageExtraPropertiesField.set(message, extraProperties);
} }
public static String getConsumerGroupByMessage(MessageView messageView) {
return messageConsumerGroupField.get(messageView);
}
public static void setConsumerGroupByMessage(MessageView messageView, String consumerGroup) {
messageConsumerGroupField.set(messageView, consumerGroup);
}
} }

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
public class RocketMqClientSuppressReceiveSpanTest
extends AbstractRocketMqClientSuppressReceiveSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public abstract class AbstractRocketMqClientSuppressReceiveSpanTest {
private static final RocketMqProxyContainer container = new RocketMqProxyContainer();
protected abstract InstrumentationExtension testing();
@BeforeAll
static void setUp() {
container.start();
}
@AfterAll
static void tearDown() {
container.close();
}
@Test
void testSendAndConsumeMessage() throws Throwable {
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build();
// Inner topic of the container.
String topic = "normal-topic-0";
ClientServiceProvider provider = ClientServiceProvider.loadService();
String consumerGroup = "group-normal-topic-0";
String tag = "tagA";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
try (PushConsumer ignored =
provider
.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(
messageView -> {
testing().runWithSpan("child", () -> {});
return ConsumeResult.SUCCESS;
})
.build()) {
try (Producer producer =
provider
.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build()) {
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
Message message =
provider
.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys(keys)
.setBody(body)
.build();
SendReceipt sendReceipt =
testing()
.runWithSpan(
"parent",
(ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasKind(SpanKind.PRODUCER)
.hasName(topic + " send")
.hasStatus(StatusData.unset())
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL),
equalTo(
MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(
MESSAGING_MESSAGE_ID,
sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic)),
span ->
span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " process")
.hasStatus(StatusData.unset())
// As the child of send span.
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(
MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(
MESSAGING_MESSAGE_ID,
sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic),
equalTo(MESSAGING_OPERATION, "process")),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))));
}
}
}
}

View File

@ -10,87 +10,110 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSA
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL;
import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
public abstract class AbstractRocketMqClientTest { public abstract class AbstractRocketMqClientTest {
private static final RocketMqProxyContainer container = new RocketMqProxyContainer();
protected abstract InstrumentationExtension testing(); protected abstract InstrumentationExtension testing();
// TODO(aaron-ai): replace it by the official image. @BeforeAll
private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; static void setUp() {
container.start();
}
@AfterAll
static void tearDown() {
container.close();
}
// We still need this container type to do fixed-port-mapping.
@SuppressWarnings({"deprecation", "rawtypes", "resource"})
@Test @Test
public void testSendMessage() throws ClientException { void testSendAndConsumeMessage() throws Throwable {
int proxyPort = PortUtils.findOpenPorts(4); ClientConfiguration clientConfiguration =
int brokerPort = proxyPort + 1; ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build();
int brokerHaPort = proxyPort + 2; // Inner topic of the container.
int namesrvPort = proxyPort + 3; String topic = "normal-topic-0";
try (GenericContainer<?> container = ClientServiceProvider provider = ClientServiceProvider.loadService();
new FixedHostPortGenericContainer(IMAGE_NAME) String consumerGroup = "group-normal-topic-0";
.withFixedExposedPort(proxyPort, proxyPort) String tag = "tagA";
.withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
.withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) try (PushConsumer ignored =
.withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) provider
.withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) .newPushConsumerBuilder()
.withExposedPorts(proxyPort)) { .setClientConfiguration(clientConfiguration)
// Start the container. .setConsumerGroup(consumerGroup)
container.start(); .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
String endpoints = "127.0.0.1:" + proxyPort; .setMessageListener(
ClientConfiguration clientConfiguration = messageView -> {
ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); testing().runWithSpan("child", () -> {});
// Inner topic of the container. return ConsumeResult.SUCCESS;
String topic = "normal-topic-0"; })
ClientServiceProvider provider = ClientServiceProvider.loadService(); .build()) {
Producer producer = try (Producer producer =
provider provider
.newProducerBuilder() .newProducerBuilder()
.setClientConfiguration(clientConfiguration) .setClientConfiguration(clientConfiguration)
.setTopics(topic) .setTopics(topic)
.build(); .build()) {
String tag = "tagA"; String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"};
String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; byte[] body = "foobar".getBytes(StandardCharsets.UTF_8);
byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); Message message =
Message message = provider
provider .newMessageBuilder()
.newMessageBuilder() .setTopic(topic)
.setTopic(topic) .setTag(tag)
.setTag(tag) .setKeys(keys)
.setKeys(keys) .setBody(body)
.setBody(body) .build();
.build();
SendReceipt sendReceipt = producer.send(message); SendReceipt sendReceipt =
testing() testing()
.waitAndAssertTraces( .runWithSpan(
traceAssert -> "parent",
traceAssert.hasSpansSatisfyingExactly( (ThrowingSupplier<SendReceipt, Throwable>) () -> producer.send(message));
spanDataAssert -> AtomicReference<SpanData> sendSpanData = new AtomicReference<>();
spanDataAssert testing()
.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasKind(SpanKind.PRODUCER)
.hasName(topic + " send") .hasName(topic + " send")
.hasStatus(StatusData.unset()) .hasStatus(StatusData.unset())
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly( .hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
@ -102,7 +125,51 @@ public abstract class AbstractRocketMqClientTest {
equalTo( equalTo(
MESSAGING_DESTINATION_KIND, MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC), SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic)))); equalTo(MESSAGING_DESTINATION, topic)));
sendSpanData.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " receive")
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic),
equalTo(MESSAGING_OPERATION, "receive")),
span ->
span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " process")
.hasStatus(StatusData.unset())
// Link to send span.
.hasLinks(LinkData.create(sendSpanData.get().getSpanContext()))
// As the child of receive span.
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag),
equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)),
equalTo(
MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(
MESSAGING_MESSAGE_ID,
sendReceipt.getMessageId().toString()),
equalTo(
MESSAGING_DESTINATION_KIND,
SemanticAttributes.MessagingDestinationKindValues.TOPIC),
equalTo(MESSAGING_DESTINATION, topic),
equalTo(MESSAGING_OPERATION, "process")),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
} }
} }
} }

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmqclient.v5_0;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
public class RocketMqProxyContainer {
// TODO(aaron-ai): replace it by the official image.
private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0";
private final GenericContainer<?> container;
final String endpoints;
// We still need this container type to do fixed-port-mapping.
@SuppressWarnings({"resource", "deprecation", "rawtypes"})
RocketMqProxyContainer() {
int proxyPort = PortUtils.findOpenPorts(4);
int brokerPort = proxyPort + 1;
int brokerHaPort = proxyPort + 2;
int namesrvPort = proxyPort + 3;
endpoints = "127.0.0.1:" + proxyPort;
container =
new FixedHostPortGenericContainer(IMAGE_NAME)
.withFixedExposedPort(proxyPort, proxyPort)
.withEnv("rocketmq.broker.port", String.valueOf(brokerPort))
.withEnv("rocketmq.proxy.port", String.valueOf(proxyPort))
.withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort))
.withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort))
.withExposedPorts(proxyPort);
}
void start() {
container.start();
}
void close() {
container.close();
}
}